Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manually trigger PeriodicTask #7174

Merged
merged 50 commits into from
Aug 23, 2021
Merged

Manually trigger PeriodicTask #7174

merged 50 commits into from
Aug 23, 2021

Conversation

amrishlal
Copy link
Contributor

@amrishlal amrishlal commented Jul 18, 2021

Description

This PR defines a REST API (see /periodictask/run in PinotControllerPeriodicTaskRestletResource.java) to manually trigger a named periodic task. If a table name is provided with the API call, the task will only run against the specified table. If no table name is provided, task will run against all tables. Once triggered, the REST API call will send a Helix message (RunPeriodicTaskMessage.java) to all controllers including itself. Any new user-defined helix message that is received by a controller is processed by ControllerMessageHandlerFactory.java which is registered for handling messages in BaseControllerStarter.java. Upon getting the message, ControllerMessageHandlerFactory will create RunPeriodicTaskMessageHandler object to process the message (see ControllerMessageHandlerFactory.createHandler function). RunPeriodicTaskMessageHandler will then call PeriodicTaskScheduler to execute the named task. To prevent thread conflict, additional synchornization was added to PeriodicTaskScheduler so that a task does not run more than once at any given time. A test case was added to PeriodicTaskSchedulerTest.java to verify this.

Upgrade Notes

Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)

  • Yes (Please label as backward-incompat, and complete the section below on Release Notes)

Does this PR fix a zero-downtime upgrade introduced earlier?

  • Yes (Please label this as backward-incompat, and complete the section below on Release Notes)

Does this PR otherwise need attention when creating release notes? Things to consider:

  • New configuration options
  • Deprecation of configurations
  • Signature changes to public methods/interfaces
  • New plugins added or old plugins removed
  • Yes (Please label this PR as release-notes and complete the section on Release Notes)

Release Notes

Documentation

@codecov-commenter
Copy link

codecov-commenter commented Jul 18, 2021

Codecov Report

Merging #7174 (b8e1f54) into master (f11b5d6) will decrease coverage by 6.63%.
The diff coverage is 45.86%.

❗ Current head b8e1f54 differs from pull request most recent head 9c35754. Consider uploading reports for the commit 9c35754 to get more accurate results
Impacted file tree graph

@@             Coverage Diff              @@
##             master    #7174      +/-   ##
============================================
- Coverage     71.39%   64.75%   -6.64%     
- Complexity     3284     3296      +12     
============================================
  Files          1503     1462      -41     
  Lines         73871    72827    -1044     
  Branches      10699    10640      -59     
============================================
- Hits          52738    47157    -5581     
- Misses        17538    22300    +4762     
+ Partials       3595     3370     -225     
Flag Coverage Δ
integration1 ?
integration2 ?
unittests1 69.42% <40.70%> (+0.33%) ⬆️
unittests2 14.51% <31.08%> (-0.08%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...org/apache/pinot/broker/api/RequestStatistics.java 41.89% <0.00%> (-22.98%) ⬇️
...e/pinot/broker/api/resources/PinotBrokerDebug.java 0.00% <ø> (-76.48%) ⬇️
...t/broker/api/resources/PinotBrokerHealthCheck.java 0.00% <ø> (ø)
...pinot/broker/api/resources/PinotBrokerRouting.java 0.00% <ø> (ø)
...pinot/broker/api/resources/PinotClientRequest.java 0.00% <0.00%> (-34.43%) ⬇️
...e/pinot/broker/broker/helix/BaseBrokerStarter.java 71.34% <ø> (-2.25%) ⬇️
...not/broker/broker/helix/ClusterChangeMediator.java 78.02% <ø> (-2.20%) ⬇️
.../pinot/broker/broker/helix/HelixBrokerStarter.java 7.40% <0.00%> (ø)
...org/apache/pinot/broker/queryquota/HitCounter.java 100.00% <ø> (ø)
...che/pinot/broker/queryquota/MaxHitRateTracker.java 95.45% <ø> (ø)
... and 771 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f11b5d6...9c35754. Read the comment docs.

@amrishlal amrishlal changed the title [WIP] Manually trigger PeriodicTask Manually trigger PeriodicTask Jul 25, 2021
@amrishlal amrishlal marked this pull request as ready for review July 25, 2021 20:25
@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 4, 2021

Invalidates PR #6983

public RunPeriodicTaskMessage(Message message) {
super(message.getRecord());
String msgSubType = message.getMsgSubType();
Preconditions.checkArgument(msgSubType.equals(RUN_PERIODIC_TASK_MSG_SUB_TYPE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt this check redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. I basically used the same pattern as other message classes, but yes the check appears to be redundant.

return new RunPeriodicTaskMessageHandler(new RunPeriodicTaskMessage(message), notificationContext, _periodicTaskScheduler);
}

LOGGER.error("Bad message type {} received by controller. ", messageType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOGGER.error("Bad message type {} received by controller. ", messageType);
LOGGER.warn("Unknown message type {} received by controller. ", messageType);

We may add new message types later, and not be able to upgrade all controllers at the same time to understand the new message. Better to drop any unknown message with a warning or even just INFO. It is a valid upgrade use case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@Override
public String getMessageType() {
return Message.MessageType.USER_DEFINE_MSG.toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Override
public HelixTaskResult handleMessage()
throws InterruptedException {
LOGGER.info("Handle RunPeriodicTaskMessage by executing task {}", _periodicTaskName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOGGER.info("Handle RunPeriodicTaskMessage by executing task {}", _periodicTaskName);
LOGGER.info("Handling RunPeriodicTaskMessage by executing task {}", _periodicTaskName);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* Execute specified {@link PeriodicTask} immediately. If the task is already running, wait for the running task
* to finish before executing the task again.
*/
public void execute(String periodicTaskName, String tableName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void execute(String periodicTaskName, String tableName) {
public void execute(String periodicTaskName, @Nullable String tableName) {

Also, suggest rename method to scheduleNow() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Override
protected void runTask() {
protected void runTask(String filter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding a filter that is to be interpreted by each subclass, please add shouldRunForTable() in base periodic task, and use that in all the sub-classes (that operate on a per-table basis). You can then add all logic there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but the issue there is that if we put shouldRunForTable() in base class, it may not apply to all subclasses? How about adding a function setAttribute(String key, String value) in the base class and this could be called as setAttribute("tablename", tableName)? For now, the only attribute, we would allow is "tablename". This way there is no strong association between table name and all the task classes which may not use the table name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will apply to ControllerPeriodicTask. The other tasks are not table-based, and so it is ok

Copy link
Contributor Author

@amrishlal amrishlal Aug 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made changes (summarized below) based on our offline discussion.

Response.Status.NOT_FOUND);
}

LOGGER.info("Sending periodic task execution message for {} to all controllers.", periodicTaskName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add table name to this log message, if provided (otherwise include the string "for all tables")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


_executorService.schedule(() -> {
try {
// To prevent thread conflict, this call will block if the same task is already running (see
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBD whether we should queue and re-run the task immediately after one run. I would suggest don't bother. If it is running, just return with an INFO message saying so.

I am eager to hear the requirements here and what others have to say.

}

LOGGER.info("Sending periodic task execution message to all controllers for running task {} against {}.",
periodicTaskName, tableName != null ? tableName + " table" : "all tables");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
periodicTaskName, tableName != null ? tableName + " table" : "all tables");
periodicTaskName, tableName != null ? " table '" + tableName + "'" : "all tables");

This will ensure that if by some chance the user specifies some weird characters, we can find out here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} else {
LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
LOGGER.warn("Task: {} finished within timeout of {}ms", MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think info is ok here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

/** Message handler for "Run Periodic Task" message. */
private static class RunPeriodicTaskMessageHandler extends MessageHandler {
private final String _periodicTaskName;
private final String _tableName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this table name be any table name like raw table name and table name with suffix? Can we add some javadoc on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified variable declaration:

`private final List<String> _tableNamesWithType;`

@ApiOperation(value = "Run controller periodic task against the specified table. If no table name is specified, task will run against all tables.")
public boolean runPeriodicTask(
@ApiParam(value = "Periodic Task Name", required = true) @QueryParam("taskname") String periodicTaskName,
@ApiParam(value = "Table Name (with type)", required = false) @QueryParam("tablename") String tableName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be a list of table names?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... a good idea, but this entire thing is a manual process. It should be easy to (write a script to) invoke the api for a bunch of tables if needed. Instead of complicating the internals, my suggestion is to let the API support the two extreme cases -- one table, all tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, changed into list.

@ApiOperation(value = "Get comma-delimited list of all available periodic task names.")
public ArrayList<String> getPeriodicTaskNames() {
ArrayList<String> list = new ArrayList<>();
list.add(org.apache.pinot.controller.validation.BrokerResourceValidationManager.class.getSimpleName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a variable called _tasksWithValidInterval in PeriodicTaskScheduler class, sth you can refer to instead of keeping a list of controller names here, which is hard to maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Done.

* Execute the task once. This method will calls the {@link #run} method.
* @param filter An implementation specific string that may dictate how the task will be run. null by default.
*/
void run(String filter);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Context object instead of a filter string? By using Context, it'll be easier to maintain and more flexible to extend.

Copy link
Contributor Author

@amrishlal amrishlal Aug 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made changes (summarized below) based on our offline discussion.

Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm otherwise

Response.Status.NOT_FOUND);
}

if (!_pinotHelixResourceManager.getAllRawTables().contains(tableName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table name is optional, so this could be null here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


private static Properties createTaskProperties(String periodicTaskReqeustId, String tableNameWithType) {
Properties periodicTaskParameters = new Properties();
periodicTaskParameters.setProperty("requestid", periodicTaskReqeustId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us put the keys as static strings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

_running = true;

String periodicTaskRequestId =
_activePeriodicTaskProperties != null ? _activePeriodicTaskProperties.getProperty("requestid") : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import from a static string define instead of hard-coding "requestid"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

_running = false;
@Override
public void run(@Nullable java.util.Properties periodicTaskProperties) {
Properties savedPeriodicTaskProperties = _activePeriodicTaskProperties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then it will be set to the value before locking. You need to get it after lock, and then restore it before releasing the lock.

_activePeriodicTaskProperties != null ? _activePeriodicTaskProperties.getProperty(PeriodicTask.PROPERTY_KEY_REQUEST_ID) : null;
if (_started) {
long startTime = System.currentTimeMillis();
LOGGER.info("[TaskRequestId: {}] Start running task: {}", periodicTaskRequestId, _taskName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If periodicTaskId is null, we will see the log as [TaskRequestId: null]. Not the best experience. Maybe set the requireId to be some specific value ("automatic" or "periodic execution" or some such, maybe?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially reviewed. Will finish the review by today

* the lead controller. The message is sent whenever API call for executing a PeriodicTask is invoked.
*/
public class RunPeriodicTaskMessage extends Message {
public static final String RUN_PERIODIC_TASK_MSG_SUB_TYPE = "PERIODIC_TASK";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) RUN_PERIODIC_TASK to be more specific?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/** Message handler for {@link RunPeriodicTaskMessage} message. */
private static class RunPeriodicTaskMessageHandler extends MessageHandler {
private final String _periodicTaskReqeustId;
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang Aug 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo Request

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

* @param periodicTaskName Name of the task that will be run.
* @param tableNameWithType Table (names with type suffix) on which task will run.
*/
public RunPeriodicTaskMessage(@Nonnull String taskRequestId, @Nonnull String periodicTaskName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not use Nonnull (assume arguments are non-null if not annotated), but mark Nullable instead for nullable argument

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

public static final String RUN_PERIODIC_TASK_MSG_SUB_TYPE = "PERIODIC_TASK";
private static final String PERIODIC_TASK_REQUEST_ID = "taskRequestId";
private static final String PERIODIC_TASK_NAME_KEY = "periodicTaskName";
private static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems all other messages put tableName as the key, should we try to match them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this q before, but our periodic tasks take table name with a type, and not raw table name. We can have two independent keys if you like

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the user-defined messages always have table name with type (resource name) but use key tableName. I kind of prefer matching them, but either way is okay

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcvsubbu @amrishlal Please take a look at SegmentRefreshMessage and RoutingTableRebuildMessage and see whether we want to match them

Comment on lines 33 to 34
private static final String PERIODIC_TASK_REQUEST_ID = "taskRequestId";
private static final String PERIODIC_TASK_NAME_KEY = "periodicTaskName";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we simplify them to requestId and taskName?
Side question, is requestId for logging and debugging purpose only?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, yes. In future, we can potentially add some tracking on the controllers that receive it, and retrieve something via controller APIs as to task status.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.


public RunPeriodicTaskMessage(Message message) {
super(message.getRecord());
String msgSubType = message.getMsgSubType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.



/** Factory class for creating message handlers for incoming helix messages. */
public class ControllerMessageHandlerFactory implements MessageHandlerFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming it to ControllerUserDefinedMessageHandlerFactory to be more specific

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

@@ -28,6 +29,10 @@
@ThreadSafe
public interface PeriodicTask extends Runnable {

// PeriodicTask objects may take a {@link Properties} object. Define all the keys property keys here.
String PROPERTY_KEY_REQUEST_ID = "requestid";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we match these 2 keys with the ones we used in the message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

/** Execute {@link PeriodicTask} immediately on the specified table. */
public void scheduleNow(String periodicTaskName, @Nullable Properties periodicTaskProperties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

properties cannot be null here, or it will throw NPE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

} catch (Throwable t) {
// catch all errors to prevent subsequent executions from being silently suppressed
// Ref: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
LOGGER.warn("[TaskRequestId: {}] Caught exception while attempting to execute named periodic task: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should log error instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

/** @return List of tasks name that will run periodically. */
public List<String> getTaskNameList() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) getTaskNames()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the error message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


private volatile boolean _started;
private volatile boolean _running;

// Properties that task may use during execution. null by default.
protected Properties _activePeriodicTaskProperties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Major)
Keep swapping the _activePeriodicTaskProperties and expect the child class to use it to identify the table name is hard to manage.
We should add an abstract method runTask(Properties periodicTaskProperties) and the child class can implement accordingly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amrish had previously done the approach of runTask(Properties) . It did not seem best since each sub-task had to be modified.
I think things have changed enough that we can take one more look at that. @amrishlal maybe you can have a quick discussion with Jackie and point out the problems. I am fine with either approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make it call runTask() by default, and override it for ControllerPeriodicTask. In ControllerPeriodicTask, both of them should call processTables() internally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a multi-threaded class, we want to have all the threads that want to do the same thing (i.e run a task) pass through the same execution chain using the same state variables. Doing this will avoid a lot of confusion with respect to what thread is executing where and which state variables that thread is using and how and where locks are being acquired and released. The alternative (which I was trying earlier as Subbu mentioned) seemed to be leading to a situation where we would not only have multiple threads, but also multiple paths of execution (even if that path is just a few functions deep and converges later on) and multiple references to the same state variable (base class has _activePeriodicTaskProperties, do we pass that on the stack as well to child classes when it is already available to all child classes. If not, then why are we passing the "other" properties object on stack, etc). Basically, it was bringing up too many issues and ending up with making too many assumptions; hence, I abandoned the approach.

The main problem here is that we need to replace the active _activePeriodicTaskProperties object and that is really all that needs to be done to make the same code work for all threads who want to do the same thing, so the simplest way to do that seemed to be was to create a run(properties) method to swap out _activePeriodicTaskProperties under lock and then call run() method to let the rest of the code do what it is already doing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically they are 2 different tasks: one periodic trigger to process all tables lead by the controller; the other one adhoc to process based on the custom properties. The second one might only need to process one table, why mixing them and force the second one to scan potentially thousands of tables?
Also, I don't agree that for multi-threaded class, we want to follow the same execution chain and mixing the logics. It won't really help with the race condition handling, and more complicated logic within a single method can lead to more bugs.

} else {
LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
LOGGER.info("Task: {} finished within timeout of {}ms", MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing _taskName. Also would suggest put the actual waiting time instead of the timeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@Override
protected final void runTask() {
_controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
try {
// Process the tables that are managed by this controller
// TODO: creating tablesToProcess list below is redundant since processTables will unroll the list anyway. This
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow this TODO. tablesToProcess gathers all the tables that are managed by this controller (this controller is the leader for these tables), and processTables() does not perform leader check

Copy link
Contributor Author

@amrishlal amrishlal Aug 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a function, ControllerPeriodicTask.runTask() doesn't do much. If it was merged with the ControllerPeriodicTask.processTables function then there would be no need to create the tablesToProcess array because processTable could directly be called on the tables that pass the filter criteria.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to make them modular so that they can be reused. I agree current way will create one extra array, but that should not cause too much overhead (especially when not in performance critical path). With processTables() separate from the runTask(), we can override it (e.g. in PinotTaskManager) or reuse it. In this PR, since we might only want to process one table, we should not call runTask() and loop over all the tables (this overhead will be much bigger than creating an extra array), but just check if the target table is lead by the controller can call processTables() on one single table.

List<String> tablesToProcess = new ArrayList<>();
for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) {
if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
if (shouldRunTaskForTable(tableNameWithType)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be very inefficient when table name is already provided. We should only check for the provided table name instead.
If we have a separate method runTask(Properties periodicTaskProperties) it will be much cleaner

Copy link
Contributor Author

@amrishlal amrishlal Aug 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored a bit to make it more efficient.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise

public static final String RUN_PERIODIC_TASK_MSG_SUB_TYPE = "PERIODIC_TASK";
private static final String PERIODIC_TASK_REQUEST_ID = "taskRequestId";
private static final String PERIODIC_TASK_NAME_KEY = "periodicTaskName";
private static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcvsubbu @amrishlal Please take a look at SegmentRefreshMessage and RoutingTableRebuildMessage and see whether we want to match them

} else {
// Table name is available, so task should run only on the specified table.
if (_leadControllerManager.isLeaderForTable(propTableNameWithType)) {
tablesToProcess.add(propTableNameWithType);
}
}
processTables(tablesToProcess);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only call processTables when tablesToProcess is not empty to avoid the overhead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

// Default properties that tasks may use during execution. This variable is private and does not have any get or set
// methods to prevent subclasses from gaining direct access to this variable. See run(Properties) method to see how
// properties are passed and used during task execution.
private Properties _activePeriodicTaskProperties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a constant and rename it

Suggested change
private Properties _activePeriodicTaskProperties;
private static final Properties DEFAULT_PERIODIC_TASK_PROPERTIES;
static {
DEFAULT_PERIODIC_TASK_PROPERTIES = new Properties();
DEFAULT_PERIODIC_TASK_PROPERTIES.put(PeriodicTask.PROPERTY_KEY_REQUEST_ID, DEFAULT_REQUEST_ID);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@mcvsubbu mcvsubbu merged commit 7a44f4a into apache:master Aug 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants