-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Manually trigger PeriodicTask #7174
Conversation
Codecov Report
@@ Coverage Diff @@
## master #7174 +/- ##
============================================
- Coverage 71.39% 64.75% -6.64%
- Complexity 3284 3296 +12
============================================
Files 1503 1462 -41
Lines 73871 72827 -1044
Branches 10699 10640 -59
============================================
- Hits 52738 47157 -5581
- Misses 17538 22300 +4762
+ Partials 3595 3370 -225
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
Invalidates PR #6983 |
public RunPeriodicTaskMessage(Message message) { | ||
super(message.getRecord()); | ||
String msgSubType = message.getMsgSubType(); | ||
Preconditions.checkArgument(msgSubType.equals(RUN_PERIODIC_TASK_MSG_SUB_TYPE), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isnt this check redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. I basically used the same pattern as other message classes, but yes the check appears to be redundant.
return new RunPeriodicTaskMessageHandler(new RunPeriodicTaskMessage(message), notificationContext, _periodicTaskScheduler); | ||
} | ||
|
||
LOGGER.error("Bad message type {} received by controller. ", messageType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.error("Bad message type {} received by controller. ", messageType); | |
LOGGER.warn("Unknown message type {} received by controller. ", messageType); |
We may add new message types later, and not be able to upgrade all controllers at the same time to understand the new message. Better to drop any unknown message with a warning or even just INFO. It is a valid upgrade use case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
@Override | ||
public String getMessageType() { | ||
return Message.MessageType.USER_DEFINE_MSG.toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private static?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@Override | ||
public HelixTaskResult handleMessage() | ||
throws InterruptedException { | ||
LOGGER.info("Handle RunPeriodicTaskMessage by executing task {}", _periodicTaskName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.info("Handle RunPeriodicTaskMessage by executing task {}", _periodicTaskName); | |
LOGGER.info("Handling RunPeriodicTaskMessage by executing task {}", _periodicTaskName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* Execute specified {@link PeriodicTask} immediately. If the task is already running, wait for the running task | ||
* to finish before executing the task again. | ||
*/ | ||
public void execute(String periodicTaskName, String tableName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void execute(String periodicTaskName, String tableName) { | |
public void execute(String periodicTaskName, @Nullable String tableName) { |
Also, suggest rename method to scheduleNow()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
Show resolved
Hide resolved
@Override | ||
protected void runTask() { | ||
protected void runTask(String filter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding a filter that is to be interpreted by each subclass, please add shouldRunForTable()
in base periodic task, and use that in all the sub-classes (that operate on a per-table basis). You can then add all logic there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that, but the issue there is that if we put shouldRunForTable()
in base class, it may not apply to all subclasses? How about adding a function setAttribute(String key, String value)
in the base class and this could be called as setAttribute("tablename", tableName)
? For now, the only attribute, we would allow is "tablename". This way there is no strong association between table name and all the task classes which may not use the table name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will apply to ControllerPeriodicTask
. The other tasks are not table-based, and so it is ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made changes (summarized below) based on our offline discussion.
Response.Status.NOT_FOUND); | ||
} | ||
|
||
LOGGER.info("Sending periodic task execution message for {} to all controllers.", periodicTaskName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add table name to this log message, if provided (otherwise include the string "for all tables")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
_executorService.schedule(() -> { | ||
try { | ||
// To prevent thread conflict, this call will block if the same task is already running (see |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBD whether we should queue and re-run the task immediately after one run. I would suggest don't bother. If it is running, just return with an INFO message saying so.
I am eager to hear the requirements here and what others have to say.
...va/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
Show resolved
Hide resolved
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerMessageHandlerFactory.java
Outdated
Show resolved
Hide resolved
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerMessageHandlerFactory.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
LOGGER.info("Sending periodic task execution message to all controllers for running task {} against {}.", | ||
periodicTaskName, tableName != null ? tableName + " table" : "all tables"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
periodicTaskName, tableName != null ? tableName + " table" : "all tables"); | |
periodicTaskName, tableName != null ? " table '" + tableName + "'" : "all tables"); |
This will ensure that if by some chance the user specifies some weird characters, we can find out here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
Show resolved
Hide resolved
} else { | ||
LOGGER.info("Task: {} is finished in {}ms", waitTimeMs); | ||
LOGGER.warn("Task: {} finished within timeout of {}ms", MAX_PERIODIC_TASK_STOP_TIME_MILLIS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think info
is ok here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
/** Message handler for "Run Periodic Task" message. */ | ||
private static class RunPeriodicTaskMessageHandler extends MessageHandler { | ||
private final String _periodicTaskName; | ||
private final String _tableName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this table name be any table name like raw table name and table name with suffix? Can we add some javadoc on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified variable declaration:
`private final List<String> _tableNamesWithType;`
@ApiOperation(value = "Run controller periodic task against the specified table. If no table name is specified, task will run against all tables.") | ||
public boolean runPeriodicTask( | ||
@ApiParam(value = "Periodic Task Name", required = true) @QueryParam("taskname") String periodicTaskName, | ||
@ApiParam(value = "Table Name (with type)", required = false) @QueryParam("tablename") String tableName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be a list of table names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... a good idea, but this entire thing is a manual process. It should be easy to (write a script to) invoke the api for a bunch of tables if needed. Instead of complicating the internals, my suggestion is to let the API support the two extreme cases -- one table, all tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, changed into list.
@ApiOperation(value = "Get comma-delimited list of all available periodic task names.") | ||
public ArrayList<String> getPeriodicTaskNames() { | ||
ArrayList<String> list = new ArrayList<>(); | ||
list.add(org.apache.pinot.controller.validation.BrokerResourceValidationManager.class.getSimpleName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a variable called _tasksWithValidInterval
in PeriodicTaskScheduler class, sth you can refer to instead of keeping a list of controller names here, which is hard to maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Done.
* Execute the task once. This method will calls the {@link #run} method. | ||
* @param filter An implementation specific string that may dictate how the task will be run. null by default. | ||
*/ | ||
void run(String filter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Context object instead of a filter string? By using Context, it'll be easier to maintain and more flexible to extend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made changes (summarized below) based on our offline discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm otherwise
Response.Status.NOT_FOUND); | ||
} | ||
|
||
if (!_pinotHelixResourceManager.getAllRawTables().contains(tableName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table name is optional, so this could be null here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
||
private static Properties createTaskProperties(String periodicTaskReqeustId, String tableNameWithType) { | ||
Properties periodicTaskParameters = new Properties(); | ||
periodicTaskParameters.setProperty("requestid", periodicTaskReqeustId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let us put the keys as static strings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
_running = true; | ||
|
||
String periodicTaskRequestId = | ||
_activePeriodicTaskProperties != null ? _activePeriodicTaskProperties.getProperty("requestid") : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import from a static string define instead of hard-coding "requestid"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
_running = false; | ||
@Override | ||
public void run(@Nullable java.util.Properties periodicTaskProperties) { | ||
Properties savedPeriodicTaskProperties = _activePeriodicTaskProperties; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then it will be set to the value before locking. You need to get it after lock, and then restore it before releasing the lock.
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
Show resolved
Hide resolved
_activePeriodicTaskProperties != null ? _activePeriodicTaskProperties.getProperty(PeriodicTask.PROPERTY_KEY_REQUEST_ID) : null; | ||
if (_started) { | ||
long startTime = System.currentTimeMillis(); | ||
LOGGER.info("[TaskRequestId: {}] Start running task: {}", periodicTaskRequestId, _taskName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If periodicTaskId is null, we will see the log as [TaskRequestId: null]
. Not the best experience. Maybe set the requireId to be some specific value ("automatic" or "periodic execution" or some such, maybe?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partially reviewed. Will finish the review by today
* the lead controller. The message is sent whenever API call for executing a PeriodicTask is invoked. | ||
*/ | ||
public class RunPeriodicTaskMessage extends Message { | ||
public static final String RUN_PERIODIC_TASK_MSG_SUB_TYPE = "PERIODIC_TASK"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) RUN_PERIODIC_TASK
to be more specific?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
/** Message handler for {@link RunPeriodicTaskMessage} message. */ | ||
private static class RunPeriodicTaskMessageHandler extends MessageHandler { | ||
private final String _periodicTaskReqeustId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo Request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
* @param periodicTaskName Name of the task that will be run. | ||
* @param tableNameWithType Table (names with type suffix) on which task will run. | ||
*/ | ||
public RunPeriodicTaskMessage(@Nonnull String taskRequestId, @Nonnull String periodicTaskName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not use Nonnull
(assume arguments are non-null if not annotated), but mark Nullable
instead for nullable argument
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
public static final String RUN_PERIODIC_TASK_MSG_SUB_TYPE = "PERIODIC_TASK"; | ||
private static final String PERIODIC_TASK_REQUEST_ID = "taskRequestId"; | ||
private static final String PERIODIC_TASK_NAME_KEY = "periodicTaskName"; | ||
private static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems all other messages put tableName
as the key, should we try to match them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had this q before, but our periodic tasks take table name with a type, and not raw table name. We can have two independent keys if you like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the user-defined messages always have table name with type (resource name) but use key tableName
. I kind of prefer matching them, but either way is okay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pending...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mcvsubbu @amrishlal Please take a look at SegmentRefreshMessage
and RoutingTableRebuildMessage
and see whether we want to match them
private static final String PERIODIC_TASK_REQUEST_ID = "taskRequestId"; | ||
private static final String PERIODIC_TASK_NAME_KEY = "periodicTaskName"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we simplify them to requestId
and taskName
?
Side question, is requestId
for logging and debugging purpose only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, yes. In future, we can potentially add some tracking on the controllers that receive it, and retrieve something via controller APIs as to task status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
|
||
public RunPeriodicTaskMessage(Message message) { | ||
super(message.getRecord()); | ||
String msgSubType = message.getMsgSubType(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
|
||
|
||
/** Factory class for creating message handlers for incoming helix messages. */ | ||
public class ControllerMessageHandlerFactory implements MessageHandlerFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest renaming it to ControllerUserDefinedMessageHandlerFactory
to be more specific
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed.
@@ -28,6 +29,10 @@ | |||
@ThreadSafe | |||
public interface PeriodicTask extends Runnable { | |||
|
|||
// PeriodicTask objects may take a {@link Properties} object. Define all the keys property keys here. | |||
String PROPERTY_KEY_REQUEST_ID = "requestid"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we match these 2 keys with the ones we used in the message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
/** Execute {@link PeriodicTask} immediately on the specified table. */ | ||
public void scheduleNow(String periodicTaskName, @Nullable Properties periodicTaskProperties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
properties
cannot be null
here, or it will throw NPE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} catch (Throwable t) { | ||
// catch all errors to prevent subsequent executions from being silently suppressed | ||
// Ref: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit- | ||
LOGGER.warn("[TaskRequestId: {}] Caught exception while attempting to execute named periodic task: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we should log error
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} | ||
|
||
/** @return List of tasks name that will run periodically. */ | ||
public List<String> getTaskNameList() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) getTaskNames()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} | ||
} catch (InterruptedException ie) { | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep the error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
||
private volatile boolean _started; | ||
private volatile boolean _running; | ||
|
||
// Properties that task may use during execution. null by default. | ||
protected Properties _activePeriodicTaskProperties; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Major)
Keep swapping the _activePeriodicTaskProperties
and expect the child class to use it to identify the table name is hard to manage.
We should add an abstract method runTask(Properties periodicTaskProperties)
and the child class can implement accordingly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amrish had previously done the approach of runTask(Properties)
. It did not seem best since each sub-task had to be modified.
I think things have changed enough that we can take one more look at that. @amrishlal maybe you can have a quick discussion with Jackie and point out the problems. I am fine with either approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make it call runTask()
by default, and override it for ControllerPeriodicTask
. In ControllerPeriodicTask
, both of them should call processTables()
internally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a multi-threaded class, we want to have all the threads that want to do the same thing (i.e run a task) pass through the same execution chain using the same state variables. Doing this will avoid a lot of confusion with respect to what thread is executing where and which state variables that thread is using and how and where locks are being acquired and released. The alternative (which I was trying earlier as Subbu mentioned) seemed to be leading to a situation where we would not only have multiple threads, but also multiple paths of execution (even if that path is just a few functions deep and converges later on) and multiple references to the same state variable (base class has _activePeriodicTaskProperties, do we pass that on the stack as well to child classes when it is already available to all child classes. If not, then why are we passing the "other" properties object on stack, etc). Basically, it was bringing up too many issues and ending up with making too many assumptions; hence, I abandoned the approach.
The main problem here is that we need to replace the active _activePeriodicTaskProperties object and that is really all that needs to be done to make the same code work for all threads who want to do the same thing, so the simplest way to do that seemed to be was to create a run(properties) method to swap out _activePeriodicTaskProperties under lock and then call run() method to let the rest of the code do what it is already doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logically they are 2 different tasks: one periodic trigger to process all tables lead by the controller; the other one adhoc to process based on the custom properties. The second one might only need to process one table, why mixing them and force the second one to scan potentially thousands of tables?
Also, I don't agree that for multi-threaded class, we want to follow the same execution chain and mixing the logics. It won't really help with the race condition handling, and more complicated logic within a single method can lead to more bugs.
} else { | ||
LOGGER.info("Task: {} is finished in {}ms", waitTimeMs); | ||
LOGGER.info("Task: {} finished within timeout of {}ms", MAX_PERIODIC_TASK_STOP_TIME_MILLIS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing _taskName
. Also would suggest put the actual waiting time instead of the timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@Override | ||
protected final void runTask() { | ||
_controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L); | ||
try { | ||
// Process the tables that are managed by this controller | ||
// TODO: creating tablesToProcess list below is redundant since processTables will unroll the list anyway. This |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite follow this TODO. tablesToProcess
gathers all the tables that are managed by this controller (this controller is the leader for these tables), and processTables()
does not perform leader check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a function, ControllerPeriodicTask.runTask()
doesn't do much. If it was merged with the ControllerPeriodicTask.processTables
function then there would be no need to create the tablesToProcess
array because processTable
could directly be called on the tables that pass the filter criteria.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to make them modular so that they can be reused. I agree current way will create one extra array, but that should not cause too much overhead (especially when not in performance critical path). With processTables()
separate from the runTask()
, we can override it (e.g. in PinotTaskManager
) or reuse it. In this PR, since we might only want to process one table, we should not call runTask()
and loop over all the tables (this overhead will be much bigger than creating an extra array), but just check if the target table is lead by the controller can call processTables()
on one single table.
List<String> tablesToProcess = new ArrayList<>(); | ||
for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) { | ||
if (_leadControllerManager.isLeaderForTable(tableNameWithType)) { | ||
if (shouldRunTaskForTable(tableNameWithType)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be very inefficient when table name is already provided. We should only check for the provided table name instead.
If we have a separate method runTask(Properties periodicTaskProperties)
it will be much cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored a bit to make it more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise
public static final String RUN_PERIODIC_TASK_MSG_SUB_TYPE = "PERIODIC_TASK"; | ||
private static final String PERIODIC_TASK_REQUEST_ID = "taskRequestId"; | ||
private static final String PERIODIC_TASK_NAME_KEY = "periodicTaskName"; | ||
private static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mcvsubbu @amrishlal Please take a look at SegmentRefreshMessage
and RoutingTableRebuildMessage
and see whether we want to match them
} else { | ||
// Table name is available, so task should run only on the specified table. | ||
if (_leadControllerManager.isLeaderForTable(propTableNameWithType)) { | ||
tablesToProcess.add(propTableNameWithType); | ||
} | ||
} | ||
processTables(tablesToProcess); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only call processTables
when tablesToProcess
is not empty to avoid the overhead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
// Default properties that tasks may use during execution. This variable is private and does not have any get or set | ||
// methods to prevent subclasses from gaining direct access to this variable. See run(Properties) method to see how | ||
// properties are passed and used during task execution. | ||
private Properties _activePeriodicTaskProperties; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this a constant and rename it
private Properties _activePeriodicTaskProperties; | |
private static final Properties DEFAULT_PERIODIC_TASK_PROPERTIES; | |
static { | |
DEFAULT_PERIODIC_TASK_PROPERTIES = new Properties(); | |
DEFAULT_PERIODIC_TASK_PROPERTIES.put(PeriodicTask.PROPERTY_KEY_REQUEST_ID, DEFAULT_REQUEST_ID); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Description
This PR defines a REST API (see
/periodictask/run
inPinotControllerPeriodicTaskRestletResource.java
) to manually trigger a named periodic task. If a table name is provided with the API call, the task will only run against the specified table. If no table name is provided, task will run against all tables. Once triggered, the REST API call will send a Helix message (RunPeriodicTaskMessage.java
) to all controllers including itself. Any new user-defined helix message that is received by a controller is processed byControllerMessageHandlerFactory.java
which is registered for handling messages inBaseControllerStarter.java
. Upon getting the message,ControllerMessageHandlerFactory
will createRunPeriodicTaskMessageHandler
object to process the message (seeControllerMessageHandlerFactory.createHandler function
).RunPeriodicTaskMessageHandler
will then callPeriodicTaskScheduler
to execute the named task. To prevent thread conflict, additional synchornization was added toPeriodicTaskScheduler
so that a task does not run more than once at any given time. A test case was added toPeriodicTaskSchedulerTest.java
to verify this.Upgrade Notes
Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
backward-incompat
, and complete the section below on Release Notes)Does this PR fix a zero-downtime upgrade introduced earlier?
backward-incompat
, and complete the section below on Release Notes)Does this PR otherwise need attention when creating release notes? Things to consider:
release-notes
and complete the section on Release Notes)Release Notes
Documentation