Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.minion;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.pinot.spi.utils.JsonUtils;


/**
* Base abstract class for task generator info.
*/
public abstract class BaseTaskGeneratorInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to add implements Serializable

/**
* @return task type
*/
public abstract String getTaskType();

/**
* @return task generator info as a Json string
*/
public String toJsonString() {
try {
return JsonUtils.objectToString(this);
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
}

@Override
public String toString() {
return toJsonString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format: move this header to top

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.common.minion;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

public class InMemoryTaskManagerStatusCache implements TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> {

private static class TaskGeneratorCacheKey {
final String _tableNameWithType;
final String _taskType;

private TaskGeneratorCacheKey(String tableNameWithType, String taskType) {
_tableNameWithType = tableNameWithType;
_taskType = taskType;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskGeneratorCacheKey that = (TaskGeneratorCacheKey) o;
return _tableNameWithType.equals(that._tableNameWithType) && _taskType.equals(that._taskType);
}

@Override
public int hashCode() {
return Objects.hash(_tableNameWithType, _taskType);
}
}

private final ConcurrentHashMap<TaskGeneratorCacheKey, TaskGeneratorMostRecentRunInfo> _cacheMap;

public InMemoryTaskManagerStatusCache() {
_cacheMap = new ConcurrentHashMap<>();
}

@Override
public TaskGeneratorMostRecentRunInfo fetchTaskGeneratorInfo(String tableNameWithType, String taskType) {
return _cacheMap.get(new TaskGeneratorCacheKey(tableNameWithType, taskType));
}

@Override
public void saveTaskGeneratorInfo(String tableNameWithType, String taskType,
Consumer<TaskGeneratorMostRecentRunInfo> taskGeneratorMostRecentRunInfoUpdater) {
_cacheMap.compute(new TaskGeneratorCacheKey(tableNameWithType, taskType), (k, v) -> {
if (v == null) {
v = TaskGeneratorMostRecentRunInfo.newInstance(tableNameWithType, taskType);
}
taskGeneratorMostRecentRunInfoUpdater.accept(v);
return v;
});
}

@Override
public void deleteTaskGeneratorInfo(String tableNameWithType, String taskType) {
_cacheMap.remove(new TaskGeneratorCacheKey(tableNameWithType, taskType));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.minion;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.google.common.annotations.VisibleForTesting;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TreeMap;
import java.util.stream.Collectors;


/**
* a task generator running history which keeps the most recent several success run timestamp and the most recent
* several error run messages.
*/
@JsonPropertyOrder({"tableNameWithType", "taskType", "mostRecentSuccessRunTS", "mostRecentErrorRunMessages"})
public class TaskGeneratorMostRecentRunInfo extends BaseTaskGeneratorInfo {
@VisibleForTesting
static final int MAX_NUM_OF_HISTORY_TO_KEEP = 5;
private final String _taskType;
private final String _tableNameWithType;
// the timestamp to error message map of the most recent several error runs
private final TreeMap<Long, String> _mostRecentErrorRunMessages;
// the timestamp of the most recent several success runs
private final List<Long> _mostRecentSuccessRunTS;

private TaskGeneratorMostRecentRunInfo(String tableNameWithType, String taskType) {
_tableNameWithType = tableNameWithType;
_taskType = taskType;
_mostRecentErrorRunMessages = new TreeMap<>();
_mostRecentSuccessRunTS = new ArrayList<>();
}

/**
* Returns the table name with type
*/
public String getTableNameWithType() {
return _tableNameWithType;
}

@Override
public String getTaskType() {
return _taskType;
}

/**
* Gets the timestamp to error message map of the most recent several error runs
*/
public TreeMap<String, String> getMostRecentErrorRunMessages() {
TreeMap<String, String> result = new TreeMap<>();
_mostRecentErrorRunMessages.forEach((timestamp, error) -> result.put(
OffsetDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC).toString(),
error));
return result;
}

/**
* Adds an error run message
* @param ts A timestamp
* @param message An error message.
*/
public void addErrorRunMessage(long ts, String message) {
_mostRecentErrorRunMessages.put(ts, message);
if (_mostRecentErrorRunMessages.size() > MAX_NUM_OF_HISTORY_TO_KEEP) {
_mostRecentErrorRunMessages.remove(_mostRecentErrorRunMessages.firstKey());
}
}

/**
* Gets the timestamp of the most recent several success runs
*/
public List<String> getMostRecentSuccessRunTS() {
return _mostRecentSuccessRunTS.stream().map(
timestamp -> OffsetDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC)
.toString())
.collect(Collectors.toList());
}

/**
* Adds a success task generating run timestamp
* @param ts A timestamp
*/
public void addSuccessRunTs(long ts) {
_mostRecentSuccessRunTS.add(ts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: any need to keep _mostRecentSuccessRunTS sorted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplifies the eviction is all. Also the APi response is easier to work with (the most recent timestamp is the one of the top)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this indicate a successful run (completion) of the task, or a successful generation of the task? If the task fails in minion, does your API address it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only indicates the successful generation of a the task. This API is exclusively for task generation

if (_mostRecentSuccessRunTS.size() > MAX_NUM_OF_HISTORY_TO_KEEP) {
// sort first in case the given timestamp is not the largest one.
Collections.sort(_mostRecentSuccessRunTS);
_mostRecentSuccessRunTS.remove(0);
}
}

/**
* Creates a new empty {@link TaskGeneratorMostRecentRunInfo}
* @param tableNameWithType the table name with type
* @param taskType the task type.
* @return a new empty {@link TaskGeneratorMostRecentRunInfo}
*/
public static TaskGeneratorMostRecentRunInfo newInstance(String tableNameWithType, String taskType) {
return new TaskGeneratorMostRecentRunInfo(tableNameWithType, taskType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.minion;

import java.util.function.Consumer;


public interface TaskManagerStatusCache<T extends BaseTaskGeneratorInfo> {

T fetchTaskGeneratorInfo(String tableNameWithType, String taskType);

void saveTaskGeneratorInfo(String tableNameWithType, String taskType,
Consumer<T> taskGeneratorMostRecentRunInfoUpdater);

void deleteTaskGeneratorInfo(String tableNameWithType, String taskType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.common.minion.InMemoryTaskManagerStatusCache;
import org.apache.pinot.common.minion.TaskGeneratorMostRecentRunInfo;
import org.apache.pinot.common.minion.TaskManagerStatusCache;
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.TlsUtils;
Expand Down Expand Up @@ -148,6 +151,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
protected RetentionManager _retentionManager;
protected SegmentStatusChecker _segmentStatusChecker;
protected PinotTaskManager _taskManager;
protected TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> _taskManagerStatusCache;
protected PeriodicTaskScheduler _periodicTaskScheduler;
protected PinotHelixTaskResourceManager _helixTaskResourceManager;
protected PinotRealtimeSegmentManager _realtimeSegmentsManager;
Expand Down Expand Up @@ -457,6 +461,7 @@ protected void configure() {
bind(_helixTaskResourceManager).to(PinotHelixTaskResourceManager.class);
bind(_segmentCompletionManager).to(SegmentCompletionManager.class);
bind(_taskManager).to(PinotTaskManager.class);
bind(_taskManagerStatusCache).to(TaskManagerStatusCache.class);
bind(connectionManager).to(HttpConnectionManager.class);
bind(_executorService).to(Executor.class);
bind(_controllerMetrics).to(ControllerMetrics.class);
Expand Down Expand Up @@ -629,13 +634,18 @@ public ControllerConf.ControllerMode getControllerMode() {
return _controllerMode;
}

protected TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> getTaskManagerStatusCache() {
return new InMemoryTaskManagerStatusCache();
}

@VisibleForTesting
protected List<PeriodicTask> setupControllerPeriodicTasks() {
LOGGER.info("Setting up periodic tasks");
List<PeriodicTask> periodicTasks = new ArrayList<>();
_taskManagerStatusCache = getTaskManagerStatusCache();
_taskManager =
new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config,
_controllerMetrics);
_controllerMetrics, _taskManagerStatusCache);
periodicTasks.add(_taskManager);
_retentionManager =
new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
Expand Down
Loading