Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non querying tasks shouldn't use processing buffers / merge buffers #16887

Merged
merged 10 commits into from
Sep 10, 2024

Conversation

LakshSingla
Copy link
Contributor

@LakshSingla LakshSingla commented Aug 13, 2024

Description

Tasks that do not support querying or query processing i.e. supportsQueries = false do not require processing threads, processing buffers, and merge buffers.

The following tasks don't support queries -

  1. Native batch ingestion tasks and subtasks
  2. Native compaction tasks and subtasks
  3. MSQ Controller tasks - MSQControllerTask
  4. Other miscellaneous tasks - ArchiveTask, KillUnusedSegmentsTask, MoveTask, RestoreTask

Release note

Reduce the direct memory requirement on the non query processing tasks by not reserving the query buffers for those.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@LakshSingla LakshSingla marked this pull request as ready for review August 13, 2024 08:42
Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes LGTM

@@ -364,6 +364,13 @@ public TaskStatus call()
command.addSystemProperty("druid.indexer.task.baseTaskDir", storageSlot.getDirectory().getAbsolutePath());
command.addSystemProperty("druid.indexer.task.tmpStorageBytesPerTask", storageSlot.getNumBytes());

if (!task.supportsQueries()) {
// Processing threads, processing buffers and merging buffers are not required on tasks which
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could you add the same comment in k8 task adapter as well.

@gianm
Copy link
Contributor

gianm commented Aug 13, 2024

Is it possible to do the logic in the peon itself rather than in the runners? i.e., if a peon is launched with a task that doesn't support queries, it doesn't create a merge pool or processing pool? That way, each way of launching a peon wouldn't need to be aware of this.

@LakshSingla
Copy link
Contributor Author

Is it possible to do the logic in the peon itself rather than in the runners

I looked at the following approaches but didn't find a suitable one:

  1. We can't modify DruidProcessingConfig, since that's shared by historicals etc. as well, and Task.class that we require for checking if task.supportsQueries can't be added to the module (since Task.class resides in the indexing module, which isn't a dep of druid-server where the former class resides).
  2. I was looking for a way to override the providers for the CliPeon, and while I can do that (checkout the last commit), I need to duplicate some of the DruidProcessingModule code too - in case the task supportsQueries. I didn't find a way to conditionally override the binding (since the Task instance isn't present when we add our bindings), or access the overridden bindings at the runtime before overriding them.

LMK if there's a way that I am missing. Otherwise, there's some duplication in the CliPeon code, which we'd need to keep in sync with the DruidProcessingModule

@kfaraz
Copy link
Contributor

kfaraz commented Aug 19, 2024

@LakshSingla , while I completely agree with not installing the DruidProcessingModule if it is not needed, I was wondering if installing it actually consumes any resources.
IIUC, the buffers are all provided as lazy singletons and would be initialized/allocated only if needed by the task.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some suggestions.

@@ -205,7 +223,67 @@ public void configure(Properties properties)
protected List<? extends Module> getModules()
{
return ImmutableList.of(
new DruidProcessingModule(),
Modules.override(new DruidProcessingModule()).with(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than doing Modules.override(), another option could be to write up a class TaskQueryProcessingModule extends DruidProcessingModule (or even just inline it here), where you could just call the super implementation, thus avoiding code duplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the idea!

Comment on lines 244 to 253
if (!task.supportsQueries()) {
return new ForwardingQueryProcessingPool(Execs.dummy());
}
return new MetricsEmittingQueryProcessingPool(
PrioritizedExecutorService.create(
lifecycle,
config
),
executorServiceMonitor
);
Copy link
Contributor

@kfaraz kfaraz Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use a ForwardingQueryProcessingPool since it is not meant to be used anyway.

Suggested change
if (!task.supportsQueries()) {
return new ForwardingQueryProcessingPool(Execs.dummy());
}
return new MetricsEmittingQueryProcessingPool(
PrioritizedExecutorService.create(
lifecycle,
config
),
executorServiceMonitor
);
if (task.supportsQueries()) {
return super.getProcessingPoolExecutor(args);
} else {
// I wonder if we shouldn't just throw an exception or return null here
return DirectQueryProcessingPool.INSTANCE;
}

A similar simplification can be done for other methods too.

Copy link
Contributor Author

@LakshSingla LakshSingla Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null would look better than the direct processing pool since using the DirectQueryProcessingPool.INSTANCE looks wrong. IMO it means to do everything in the calling thread which isn't the expected behaviour. Also, I'll test if throwing an exception works, but I think that would cause guice initialization error.

since it is not meant to be used anyway

I didn't understand this part. Why should we not be using the ForwardingQueryProcessingPool. The benefit of my approach would be that the calling code wouldn't need to assume that the processing pool can be null anywhere, and handle that case separately. Moreover, it also acts as a safeguard in case any non querying task tries to submit a task to the pool, instead of complacently executing the task in the same thread (as with the direct processing pool).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand this part. Why should we not be using the ForwardingQueryProcessingPool. The benefit of my approach would be that the calling code wouldn't need to assume that the processing pool can be null anywhere, and handle that case separately. Moreover, it also acts as a safeguard in case any non querying task tries to submit a task to the pool, instead of complacently executing the task in the same thread (as with the direct processing pool).

I meant that if we know upfront that this task is not meant to use the query processing pool, then we should never return an instance that can be used at all, even if it causes the task to fail (since it was doing something illegal anyway).

I agree with your point about null.
How about we add a NoopQueryProcessingPool that throws Unsupported exception when anything is submitted to it?

Copy link
Contributor

@kfaraz kfaraz Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I wish the QueryProcessingPool didn't extend ListeningExecutorService.
It would make for a cleaner interface and it would have been much easier to write dummy implementations.
Are the executor service methods ever called on the query processing pool?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one usage of the processing pool as an executor service. Its javadoc also mentions such usages

* This interface extends {@link ListeningExecutorService} as well. It has a separate
* method to submit query execution tasks so that implementations can differentiate those tasks from any regular async
* tasks. One example is {@link org.apache.druid.query.groupby.GroupingEngine#mergeRunners(QueryProcessingPool, Iterable)}
* where different kind of tasks are submitted to same processing pool.

I think a cleaner design would have been to have a method getExecutor() in the QueryProcessingPool interface. But since this is an @ExtensionPoint, I suppose we should leave it as is for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we should never return an instance that can be used at all, even if it causes the task to fail

The ForwardingQueryProcessingPool(Execs.dummy()) would do exactly that unless I am mistaken. The task would be delegated to the dummy executor which throws UOE on any attempt to submit the task.
I attempted to create a NoopQueryProcessingPool while raising the PR, but it was doing the same thing. Maybe I can rename and make it clearer to read, or subclass the forwarding pool explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ForwardingQueryProcessingPool(Execs.dummy()) would do exactly that unless I am mistaken. The task would be delegated to the dummy executor which throws UOE on any attempt to submit the task.

While this is true, there are small differences in using a dedicated NoopQueryProcessingPool:

  • The intent is clearer to someone reading the code. Using the Noop implementation implies that it is meant to do nothing. Using a Forwarding pool with a dummy executor could mean that it is supposed to have partial functionality.
  • The error message (and perhaps the stack trace too) would be more user-friendly. When using Noop pool, the exception is thrown by the processing pool itself rather than the underlying dummy executor service.

That said, this is not a blocker for this PR as it is a style choice really.
There are already some quirks of the QueryProcessingPool interface that could use some cleanup. We could address this then.

new Module()
{
@Override
public void configure(Binder binder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need not override this method if extending DruidProcessingModule.

@LakshSingla
Copy link
Contributor Author

@kfaraz

buffers are all provided as lazy singletons and would be initialized/allocated only if needed by the task.

The pool is created lazily, which is when the various query toolchests/runners/engines are created. The allocation of the buffer can or cannot be lazy depending on the type of the pool.

  1. For the blocking pool, all the buffers are allocated upfront in the constructor.
  2. For the non-blocking pool, the buffers are allocated as you go. There's also an initialization count which determines the number of buffers to initialize at the beginning. This is set to number of processing threads, which means that we still allocate the buffers at the beginning.

I have verified the above by looking at one of the controller logs, which shouldn't be using the buffers.

316  2024-08-06T15:21:43,532 INFO [main] org.apache.druid.offheap.OffheapBufferGenerator - Allocating new intermediate processing buffer[0] of size[400,000,000]
   1 2024-08-06T15:21:43,655 INFO [main] org.apache.druid.offheap.OffheapBufferGenerator - Allocating new intermediate processing buffer[1] of size[400,000,000]
   2 2024-08-06T15:21:43,775 INFO [main] org.apache.druid.offheap.OffheapBufferGenerator - Allocating new result merging buffer[0] of size[400,000,000]
   3 2024-08-06T15:21:43,894 INFO [main] org.apache.druid.offheap.OffheapBufferGenerator - Allocating new result merging buffer[1] of size[400,000,000]

@LakshSingla
Copy link
Contributor Author

Thanks for the suggestion, that is much better than what I was trying to achieve with the latest commit.

@LakshSingla
Copy link
Contributor Author

@kfaraz
It seems the overriding the provides methods isn't allowed by Guice. Unfortunately, using Modules.overrides() is the only way. Going ahead with the NoopQueryProcessingPool suggestion along with reverting the latest commit.

  8 1) Overriding @Provides methods is not allowed.
  9         @Provides method: org.apache.druid.guice.DruidProcessingModule.getIntermediateResultsPool()
 10         overridden by: org.apache.druid.guice.PeonProcessingModule.getIntermediateResultsPool()
 11   at com.google.inject.internal.ProviderMethodsModule.getProviderMethods(ProviderMethodsModule.java:163)
 12
 13 2) Overriding @Provides methods is not allowed.

This reverts commit 83085d4.
@kfaraz
Copy link
Contributor

kfaraz commented Aug 21, 2024

Ah, thanks for the clarification, @LakshSingla . Nice of Guice to give clear error messages.

Going ahead with the NoopQueryProcessingPool suggestion along with reverting the latest commit.

You are too quick to jump between commits 😛 .

There are still other things that can be done, like:
a) All the provider methods in DruidProcessingModule internally call a corresponding static creator method. PeonProcessingModule could use the same methods. Thus no code duplication, no override.
OR b) Both DruidProcessingModule and PeonProcessingModule extend a common base class which has the actual (non-provider) methods. Both the modules could override and annotate the methods appropriately.

Although, I think out of these two, option (a) is better.
I can't think of other better ways right now, will let you know if something comes to mind.

For now, do you think the above suggestion seems viable?

@LakshSingla
Copy link
Contributor Author

I think the first one seems neat. Lemme try it out.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for incorporating the feedback, @LakshSingla ! Left some more suggestions.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ProcessingModuleHelper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need not add a new class for the static methods. I think it is cleaner to just keep these methods in DruidProcessingModule itself. It would help with the review as well.

Copy link
Contributor Author

@LakshSingla LakshSingla Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, DruidProcessingModule is more like the Historical+Indexer processing module. The same method for caching etc is copied everywhere. I feel that its neater to have it in a separate method, so that the methods can be used by other processing modules as well.

/**
* Implementation of {@link QueryProcessingPool} that throws when it is given any query execution task unit
*/
public class NoopQueryProcessingPool extends ForwardingQueryProcessingPool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are writing a Noop implementation, it should not extend the Forwarding pool, rather implement the QueryProcessingPool directly and throw unsupported or equivalent exception in all methods.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments, rest looks good.

public class NoopQueryProcessingPool implements QueryProcessingPool
{
private static final QueryProcessingPool INSTANCE = new NoopQueryProcessingPool();
private static final DruidException UNSUPPORTED_EXCEPTION =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if keeping an exception constant is desirable. You can keep the exception message as a constant but throw a fresh exception wherever needed.

private static final DruidException UNSUPPORTED_EXCEPTION =
DruidException.defensive("Unexpected call made to NoopQueryProcessingPool");

public static QueryProcessingPool instance()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static QueryProcessingPool instance()
public static NoopQueryProcessingPool instance()

*/
public class NoopQueryProcessingPool implements QueryProcessingPool
{
private static final QueryProcessingPool INSTANCE = new NoopQueryProcessingPool();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static final QueryProcessingPool INSTANCE = new NoopQueryProcessingPool();
private static final NoopQueryProcessingPool INSTANCE = new NoopQueryProcessingPool();

Comment on lines 105 to 108
if (!task.supportsQueries()) {
return DummyNonBlockingPool.instance();
}
return DruidProcessingModule.createIntermediateResultsPool(config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe invert the condition for readability:

Suggested change
if (!task.supportsQueries()) {
return DummyNonBlockingPool.instance();
}
return DruidProcessingModule.createIntermediateResultsPool(config);
if (task.supportsQueries()) {
return DruidProcessingModule.createIntermediateResultsPool(config);
} else {
return DummyNonBlockingPool.instance();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment in other methods.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, @LakshSingla !

@LakshSingla
Copy link
Contributor Author

Tests are failing due to insufficient coverage of the changes made to the processing module.

@LakshSingla LakshSingla merged commit 72fbaf2 into apache:master Sep 10, 2024
80 of 90 checks passed
@LakshSingla LakshSingla deleted the controller-no-buffers branch September 10, 2024 06:06
@LakshSingla LakshSingla mentioned this pull request Oct 9, 2024
1 task
@adarshsanjeev adarshsanjeev added this to the 32.0.0 milestone Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants