-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Non querying tasks shouldn't use processing buffers / merge buffers #16887
Non querying tasks shouldn't use processing buffers / merge buffers #16887
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes LGTM
@@ -364,6 +364,13 @@ public TaskStatus call() | |||
command.addSystemProperty("druid.indexer.task.baseTaskDir", storageSlot.getDirectory().getAbsolutePath()); | |||
command.addSystemProperty("druid.indexer.task.tmpStorageBytesPerTask", storageSlot.getNumBytes()); | |||
|
|||
if (!task.supportsQueries()) { | |||
// Processing threads, processing buffers and merging buffers are not required on tasks which |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: could you add the same comment in k8 task adapter as well.
Is it possible to do the logic in the peon itself rather than in the runners? i.e., if a peon is launched with a task that doesn't support queries, it doesn't create a merge pool or processing pool? That way, each way of launching a peon wouldn't need to be aware of this. |
I looked at the following approaches but didn't find a suitable one:
LMK if there's a way that I am missing. Otherwise, there's some duplication in the |
@LakshSingla , while I completely agree with not installing the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some suggestions.
@@ -205,7 +223,67 @@ public void configure(Properties properties) | |||
protected List<? extends Module> getModules() | |||
{ | |||
return ImmutableList.of( | |||
new DruidProcessingModule(), | |||
Modules.override(new DruidProcessingModule()).with( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than doing Modules.override()
, another option could be to write up a class TaskQueryProcessingModule extends DruidProcessingModule
(or even just inline it here), where you could just call the super implementation, thus avoiding code duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the idea!
if (!task.supportsQueries()) { | ||
return new ForwardingQueryProcessingPool(Execs.dummy()); | ||
} | ||
return new MetricsEmittingQueryProcessingPool( | ||
PrioritizedExecutorService.create( | ||
lifecycle, | ||
config | ||
), | ||
executorServiceMonitor | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not use a ForwardingQueryProcessingPool
since it is not meant to be used anyway.
if (!task.supportsQueries()) { | |
return new ForwardingQueryProcessingPool(Execs.dummy()); | |
} | |
return new MetricsEmittingQueryProcessingPool( | |
PrioritizedExecutorService.create( | |
lifecycle, | |
config | |
), | |
executorServiceMonitor | |
); | |
if (task.supportsQueries()) { | |
return super.getProcessingPoolExecutor(args); | |
} else { | |
// I wonder if we shouldn't just throw an exception or return null here | |
return DirectQueryProcessingPool.INSTANCE; | |
} |
A similar simplification can be done for other methods too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null would look better than the direct processing pool since using the DirectQueryProcessingPool.INSTANCE
looks wrong. IMO it means to do everything in the calling thread which isn't the expected behaviour. Also, I'll test if throwing an exception works, but I think that would cause guice initialization error.
since it is not meant to be used anyway
I didn't understand this part. Why should we not be using the ForwardingQueryProcessingPool
. The benefit of my approach would be that the calling code wouldn't need to assume that the processing pool can be null
anywhere, and handle that case separately. Moreover, it also acts as a safeguard in case any non querying task tries to submit a task to the pool, instead of complacently executing the task in the same thread (as with the direct processing pool).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't understand this part. Why should we not be using the ForwardingQueryProcessingPool. The benefit of my approach would be that the calling code wouldn't need to assume that the processing pool can be null anywhere, and handle that case separately. Moreover, it also acts as a safeguard in case any non querying task tries to submit a task to the pool, instead of complacently executing the task in the same thread (as with the direct processing pool).
I meant that if we know upfront that this task is not meant to use the query processing pool, then we should never return an instance that can be used at all, even if it causes the task to fail (since it was doing something illegal anyway).
I agree with your point about null
.
How about we add a NoopQueryProcessingPool
that throws Unsupported
exception when anything is submitted to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I wish the QueryProcessingPool
didn't extend ListeningExecutorService
.
It would make for a cleaner interface and it would have been much easier to write dummy implementations.
Are the executor service methods ever called on the query processing pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 214 in 2198001
queryProcessingPool, // Passed as executor service |
This is one usage of the processing pool as an executor service. Its javadoc also mentions such usages
druid/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java
Lines 31 to 34 in 2198001
* This interface extends {@link ListeningExecutorService} as well. It has a separate | |
* method to submit query execution tasks so that implementations can differentiate those tasks from any regular async | |
* tasks. One example is {@link org.apache.druid.query.groupby.GroupingEngine#mergeRunners(QueryProcessingPool, Iterable)} | |
* where different kind of tasks are submitted to same processing pool. |
I think a cleaner design would have been to have a method getExecutor()
in the QueryProcessingPool
interface. But since this is an @ExtensionPoint
, I suppose we should leave it as is for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then we should never return an instance that can be used at all, even if it causes the task to fail
The ForwardingQueryProcessingPool(Execs.dummy())
would do exactly that unless I am mistaken. The task would be delegated to the dummy executor which throws UOE on any attempt to submit the task.
I attempted to create a NoopQueryProcessingPool
while raising the PR, but it was doing the same thing. Maybe I can rename and make it clearer to read, or subclass the forwarding pool explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ForwardingQueryProcessingPool(Execs.dummy()) would do exactly that unless I am mistaken. The task would be delegated to the dummy executor which throws UOE on any attempt to submit the task.
While this is true, there are small differences in using a dedicated NoopQueryProcessingPool
:
- The intent is clearer to someone reading the code. Using the
Noop
implementation implies that it is meant to do nothing. Using aForwarding
pool with a dummy executor could mean that it is supposed to have partial functionality. - The error message (and perhaps the stack trace too) would be more user-friendly. When using
Noop
pool, the exception is thrown by the processing pool itself rather than the underlying dummy executor service.
That said, this is not a blocker for this PR as it is a style choice really.
There are already some quirks of the QueryProcessingPool
interface that could use some cleanup. We could address this then.
new Module() | ||
{ | ||
@Override | ||
public void configure(Binder binder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need not override this method if extending DruidProcessingModule
.
The pool is created lazily, which is when the various query toolchests/runners/engines are created. The allocation of the buffer can or cannot be lazy depending on the type of the pool.
I have verified the above by looking at one of the controller logs, which shouldn't be using the buffers.
|
Thanks for the suggestion, that is much better than what I was trying to achieve with the latest commit. |
@kfaraz
|
This reverts commit 83085d4.
Ah, thanks for the clarification, @LakshSingla . Nice of Guice to give clear error messages.
You are too quick to jump between commits 😛 . There are still other things that can be done, like: Although, I think out of these two, option (a) is better. For now, do you think the above suggestion seems viable? |
I think the first one seems neat. Lemme try it out. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for incorporating the feedback, @LakshSingla ! Left some more suggestions.
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
|
||
public class ProcessingModuleHelper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need not add a new class for the static methods. I think it is cleaner to just keep these methods in DruidProcessingModule
itself. It would help with the review as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this change, DruidProcessingModule is more like the Historical+Indexer processing module. The same method for caching etc is copied everywhere. I feel that its neater to have it in a separate method, so that the methods can be used by other processing modules as well.
/** | ||
* Implementation of {@link QueryProcessingPool} that throws when it is given any query execution task unit | ||
*/ | ||
public class NoopQueryProcessingPool extends ForwardingQueryProcessingPool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are writing a Noop
implementation, it should not extend the Forwarding
pool, rather implement the QueryProcessingPool
directly and throw unsupported or equivalent exception in all methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments, rest looks good.
public class NoopQueryProcessingPool implements QueryProcessingPool | ||
{ | ||
private static final QueryProcessingPool INSTANCE = new NoopQueryProcessingPool(); | ||
private static final DruidException UNSUPPORTED_EXCEPTION = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if keeping an exception constant is desirable. You can keep the exception message as a constant but throw a fresh exception wherever needed.
private static final DruidException UNSUPPORTED_EXCEPTION = | ||
DruidException.defensive("Unexpected call made to NoopQueryProcessingPool"); | ||
|
||
public static QueryProcessingPool instance() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static QueryProcessingPool instance() | |
public static NoopQueryProcessingPool instance() |
*/ | ||
public class NoopQueryProcessingPool implements QueryProcessingPool | ||
{ | ||
private static final QueryProcessingPool INSTANCE = new NoopQueryProcessingPool(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private static final QueryProcessingPool INSTANCE = new NoopQueryProcessingPool(); | |
private static final NoopQueryProcessingPool INSTANCE = new NoopQueryProcessingPool(); |
if (!task.supportsQueries()) { | ||
return DummyNonBlockingPool.instance(); | ||
} | ||
return DruidProcessingModule.createIntermediateResultsPool(config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe invert the condition for readability:
if (!task.supportsQueries()) { | |
return DummyNonBlockingPool.instance(); | |
} | |
return DruidProcessingModule.createIntermediateResultsPool(config); | |
if (task.supportsQueries()) { | |
return DruidProcessingModule.createIntermediateResultsPool(config); | |
} else { | |
return DummyNonBlockingPool.instance(); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment in other methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, @LakshSingla !
Tests are failing due to insufficient coverage of the changes made to the processing module. |
Description
Tasks that do not support querying or query processing i.e.
supportsQueries = false
do not require processing threads, processing buffers, and merge buffers.The following tasks don't support queries -
Release note
Reduce the direct memory requirement on the non query processing tasks by not reserving the query buffers for those.
This PR has: