Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.spi.config.workload.InstanceCost;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -268,20 +269,39 @@ public void onError(Exception e, ErrorCode code, ErrorType type) {
private static class QueryWorkloadRefreshMessageHandler extends MessageHandler {
final String _queryWorkloadName;
final InstanceCost _instanceCost;
final String _messageType;

QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage queryWorkloadRefreshMessage,
NotificationContext context) {
super(queryWorkloadRefreshMessage, context);
_queryWorkloadName = queryWorkloadRefreshMessage.getQueryWorkloadName();
_instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
_messageType = queryWorkloadRefreshMessage.getMsgSubType();
}

@Override
public HelixTaskResult handleMessage() {
// TODO: Add logic to invoke the query workload manager to refresh/delete the query workload config
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
return result;
LOGGER.info("Handling query workload message: {}", _message);
try {
if (_messageType.equals(QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
Tracing.ThreadAccountantOps.getWorkloadBudgetManager().deleteWorkload(_queryWorkloadName);
} else if (_messageType.equals(QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
if (_instanceCost == null) {
throw new IllegalStateException(
"Instance cost is not provided for refreshing query workload: " + _queryWorkloadName);
}
Tracing.ThreadAccountantOps.getWorkloadBudgetManager()
.addOrUpdateWorkload(_queryWorkloadName, _instanceCost.getCpuCostNs(), _instanceCost.getMemoryCostBytes());
} else {
throw new IllegalStateException("Unknown message type: " + _messageType);
}
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
return result;
} catch (Exception e) {
LOGGER.warn("Failed to handle query workload message: {}", _queryWorkloadName, e);
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public QueryWorkloadRefreshMessage(String queryWorkloadName, String messageSubTy
public QueryWorkloadRefreshMessage(Message message) {
super(message.getRecord());
if (!message.getMsgSubType().equals(REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)
|| !message.getMsgSubType().equals(DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
&& !message.getMsgSubType().equals(DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
throw new IllegalArgumentException("Unknown message subtype:" + message.getMsgSubType());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,12 +746,6 @@ public void start()
new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics);
_helixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), messageHandlerFactory);
// Query workload message handler factory
QueryWorkloadMessageHandlerFactory queryWorkloadMessageHandlerFactory =
new QueryWorkloadMessageHandlerFactory(serverMetrics);
_helixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
queryWorkloadMessageHandlerFactory);

serverMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () -> _helixManager.isConnected() ? 1L : 0L);
_helixManager.addPreConnectCallback(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.messages.ForceCommitMessage;
import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage;
import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.messages.TableConfigSchemaRefreshMessage;
Expand All @@ -43,6 +44,8 @@
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.workload.InstanceCost;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -77,6 +80,9 @@ public MessageHandler createHandler(Message message, NotificationContext context
return new IngestionMetricsRemoveMessageHandler(new IngestionMetricsRemoveMessage(message), _metrics, context);
case TableConfigSchemaRefreshMessage.REFRESH_TABLE_CONFIG_AND_SCHEMA:
return new TableSchemaRefreshMessageHandler(new TableConfigSchemaRefreshMessage(message), _metrics, context);
case QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE:
case QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE:
return new QueryWorkloadRefreshMessageHandler(new QueryWorkloadRefreshMessage(message), _metrics, context);
default:
LOGGER.warn("Unsupported user defined message sub type: {} for segment: {}", msgSubType,
message.getPartitionName());
Expand Down Expand Up @@ -273,6 +279,51 @@ public HelixTaskResult handleMessage() {
}
}

private static class QueryWorkloadRefreshMessageHandler extends DefaultMessageHandler {
final String _queryWorkloadName;
final InstanceCost _instanceCost;
final String _messageType;

QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage queryWorkloadRefreshMessage,
ServerMetrics metrics, NotificationContext context) {
super(queryWorkloadRefreshMessage, metrics, context);
_queryWorkloadName = queryWorkloadRefreshMessage.getQueryWorkloadName();
_instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
_messageType = queryWorkloadRefreshMessage.getMsgSubType();
}

@Override
public HelixTaskResult handleMessage() {
LOGGER.info("Handling query workload message: {}", _message);
try {
if (_messageType.equals(QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
Tracing.ThreadAccountantOps.getWorkloadBudgetManager().deleteWorkload(_queryWorkloadName);
} else if (_messageType.equals(QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
if (_instanceCost == null) {
throw new IllegalStateException(
"Instance cost is not provided for refreshing query workload: " + _queryWorkloadName);
}
Tracing.ThreadAccountantOps.getWorkloadBudgetManager()
.addOrUpdateWorkload(_queryWorkloadName, _instanceCost.getCpuCostNs(), _instanceCost.getMemoryCostBytes());
} else {
throw new IllegalStateException("Unknown message type: " + _messageType);
}
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
return result;
} catch (Exception e) {
LOGGER.warn("Failed to handle query workload message: {}", _queryWorkloadName, e);
throw e;
}
}

@Override
public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) {
LOGGER.error("Got error while refreshing query workload config for query workload: {} (error code: {},"
+ " error type: {})", _queryWorkloadName, errorCode, errorType, e);
}
}

private static class DefaultMessageHandler extends MessageHandler {
final String _segmentName;
final String _tableNameWithType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class WorkloadBudgetManager {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadBudgetManager.class);

private long _enforcementWindowMs;
private final ConcurrentHashMap<String, Budget> _workloadBudgets = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Budget> _workloadBudgets;
private final ScheduledExecutorService _resetScheduler = Executors.newSingleThreadScheduledExecutor();
private volatile boolean _isEnabled;

Expand All @@ -45,6 +45,7 @@ public WorkloadBudgetManager(PinotConfiguration config) {
LOGGER.info("WorkloadBudgetManager is disabled. Creating a no-op instance.");
return;
}
_workloadBudgets = new ConcurrentHashMap<>();
_enforcementWindowMs = config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
initSecondaryWorkloadBudget(config);
Expand Down Expand Up @@ -113,6 +114,15 @@ public void addOrUpdateWorkload(String workload, long cpuBudgetNs, long memoryBu
memoryBudgetBytes);
}

public void deleteWorkload(String workload) {
if (!_isEnabled) {
LOGGER.info("WorkloadBudgetManager is disabled. Not deleting workload: {}", workload);
return;
}
_workloadBudgets.remove(workload);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we initialize _workloadBudgets only if _isEnabled is true?
Also handle isEnabled in this function before removing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was initially thinking of keeping delete lenient. For example, if the feature is disabled and we try to delete a workload, the operation wouldn’t succeed. However, since the state is in-memory, it would eventually get cleaned up on restart.

On second thought, it’s probably better to support delete only when the feature is enabled — it’s more useful when the feature is toggled on and off and on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also move the initialization of _workloadBudgets such that it's done after the isEnabled check?

LOGGER.info("Removed workload: {}", workload);
}

/**
* Attempts to charge CPU and memory usage against the workload budget (Thread-Safe).
* Returns the remaining budget for CPU and memory after charge.
Expand Down Expand Up @@ -161,8 +171,7 @@ public BudgetStats getRemainingBudgetAcrossAllWorkloads() {
* Periodically resets budgets at the end of each enforcement window (Thread-Safe).
*/
private void startBudgetResetTask() {
// TODO(Vivek): Reduce logging verbosity. Maybe make it debug logs.
LOGGER.info("Starting budget reset task with enforcement window: {}ms", _enforcementWindowMs);
LOGGER.debug("Starting budget reset task with enforcement window: {}ms", _enforcementWindowMs);
_resetScheduler.scheduleAtFixedRate(() -> {
LOGGER.debug("Resetting all workload budgets.");
// Also print the budget used in the last enforcement window.
Expand Down
Loading