-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Task genrator debug api #9058
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Task genrator debug api #9058
Changes from all commits
0186642
ae25c2e
4336064
da71ba8
86b14a3
29d1e2f
1177190
af6b497
79a7ed1
efd4dfe
9e33055
35d736f
a5fe721
18b03d3
8c5456a
4980e0d
aa80094
009ba40
0d23c99
ec3c848
08d9c6a
f7a95b9
9fa0078
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.common.minion; | ||
|
|
||
| import com.fasterxml.jackson.core.JsonProcessingException; | ||
| import org.apache.pinot.spi.utils.JsonUtils; | ||
|
|
||
|
|
||
| /** | ||
| * Base abstract class for task generator info. | ||
| */ | ||
| public abstract class BaseTaskGeneratorInfo { | ||
| /** | ||
| * @return task type | ||
| */ | ||
| public abstract String getTaskType(); | ||
|
|
||
| /** | ||
| * @return task generator info as a Json string | ||
| */ | ||
| public String toJsonString() { | ||
| try { | ||
| return JsonUtils.objectToString(this); | ||
| } catch (JsonProcessingException e) { | ||
| throw new IllegalStateException(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return toJsonString(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
|
||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.pinot.common.minion; | ||
|
|
||
| import java.util.Objects; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.function.Consumer; | ||
|
|
||
| public class InMemoryTaskManagerStatusCache implements TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> { | ||
|
|
||
| private static class TaskGeneratorCacheKey { | ||
| final String _tableNameWithType; | ||
| final String _taskType; | ||
|
|
||
| private TaskGeneratorCacheKey(String tableNameWithType, String taskType) { | ||
| _tableNameWithType = tableNameWithType; | ||
| _taskType = taskType; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (o == null || getClass() != o.getClass()) { | ||
| return false; | ||
| } | ||
| TaskGeneratorCacheKey that = (TaskGeneratorCacheKey) o; | ||
| return _tableNameWithType.equals(that._tableNameWithType) && _taskType.equals(that._taskType); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(_tableNameWithType, _taskType); | ||
| } | ||
| } | ||
|
|
||
| private final ConcurrentHashMap<TaskGeneratorCacheKey, TaskGeneratorMostRecentRunInfo> _cacheMap; | ||
|
|
||
| public InMemoryTaskManagerStatusCache() { | ||
| _cacheMap = new ConcurrentHashMap<>(); | ||
| } | ||
|
|
||
| @Override | ||
| public TaskGeneratorMostRecentRunInfo fetchTaskGeneratorInfo(String tableNameWithType, String taskType) { | ||
| return _cacheMap.get(new TaskGeneratorCacheKey(tableNameWithType, taskType)); | ||
| } | ||
|
|
||
| @Override | ||
| public void saveTaskGeneratorInfo(String tableNameWithType, String taskType, | ||
| Consumer<TaskGeneratorMostRecentRunInfo> taskGeneratorMostRecentRunInfoUpdater) { | ||
| _cacheMap.compute(new TaskGeneratorCacheKey(tableNameWithType, taskType), (k, v) -> { | ||
| if (v == null) { | ||
| v = TaskGeneratorMostRecentRunInfo.newInstance(tableNameWithType, taskType); | ||
| } | ||
| taskGeneratorMostRecentRunInfoUpdater.accept(v); | ||
| return v; | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void deleteTaskGeneratorInfo(String tableNameWithType, String taskType) { | ||
| _cacheMap.remove(new TaskGeneratorCacheKey(tableNameWithType, taskType)); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.common.minion; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonPropertyOrder; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import java.time.Instant; | ||
| import java.time.OffsetDateTime; | ||
| import java.time.ZoneOffset; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.TreeMap; | ||
| import java.util.stream.Collectors; | ||
|
|
||
|
|
||
| /** | ||
| * a task generator running history which keeps the most recent several success run timestamp and the most recent | ||
| * several error run messages. | ||
| */ | ||
| @JsonPropertyOrder({"tableNameWithType", "taskType", "mostRecentSuccessRunTS", "mostRecentErrorRunMessages"}) | ||
| public class TaskGeneratorMostRecentRunInfo extends BaseTaskGeneratorInfo { | ||
| @VisibleForTesting | ||
| static final int MAX_NUM_OF_HISTORY_TO_KEEP = 5; | ||
| private final String _taskType; | ||
| private final String _tableNameWithType; | ||
| // the timestamp to error message map of the most recent several error runs | ||
| private final TreeMap<Long, String> _mostRecentErrorRunMessages; | ||
| // the timestamp of the most recent several success runs | ||
| private final List<Long> _mostRecentSuccessRunTS; | ||
|
|
||
| private TaskGeneratorMostRecentRunInfo(String tableNameWithType, String taskType) { | ||
| _tableNameWithType = tableNameWithType; | ||
| _taskType = taskType; | ||
| _mostRecentErrorRunMessages = new TreeMap<>(); | ||
| _mostRecentSuccessRunTS = new ArrayList<>(); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the table name with type | ||
| */ | ||
| public String getTableNameWithType() { | ||
| return _tableNameWithType; | ||
| } | ||
|
|
||
| @Override | ||
| public String getTaskType() { | ||
| return _taskType; | ||
| } | ||
|
|
||
| /** | ||
| * Gets the timestamp to error message map of the most recent several error runs | ||
| */ | ||
| public TreeMap<String, String> getMostRecentErrorRunMessages() { | ||
| TreeMap<String, String> result = new TreeMap<>(); | ||
| _mostRecentErrorRunMessages.forEach((timestamp, error) -> result.put( | ||
| OffsetDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC).toString(), | ||
| error)); | ||
| return result; | ||
| } | ||
|
|
||
| /** | ||
| * Adds an error run message | ||
| * @param ts A timestamp | ||
| * @param message An error message. | ||
| */ | ||
| public void addErrorRunMessage(long ts, String message) { | ||
| _mostRecentErrorRunMessages.put(ts, message); | ||
| if (_mostRecentErrorRunMessages.size() > MAX_NUM_OF_HISTORY_TO_KEEP) { | ||
| _mostRecentErrorRunMessages.remove(_mostRecentErrorRunMessages.firstKey()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Gets the timestamp of the most recent several success runs | ||
| */ | ||
| public List<String> getMostRecentSuccessRunTS() { | ||
| return _mostRecentSuccessRunTS.stream().map( | ||
| timestamp -> OffsetDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC) | ||
| .toString()) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| /** | ||
| * Adds a success task generating run timestamp | ||
| * @param ts A timestamp | ||
| */ | ||
| public void addSuccessRunTs(long ts) { | ||
| _mostRecentSuccessRunTS.add(ts); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q: any need to keep _mostRecentSuccessRunTS sorted?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simplifies the eviction is all. Also the APi response is easier to work with (the most recent timestamp is the one of the top)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this indicate a successful run (completion) of the task, or a successful generation of the task? If the task fails in minion, does your API address it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This only indicates the successful generation of a the task. This API is exclusively for task generation |
||
| if (_mostRecentSuccessRunTS.size() > MAX_NUM_OF_HISTORY_TO_KEEP) { | ||
| // sort first in case the given timestamp is not the largest one. | ||
| Collections.sort(_mostRecentSuccessRunTS); | ||
| _mostRecentSuccessRunTS.remove(0); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Creates a new empty {@link TaskGeneratorMostRecentRunInfo} | ||
| * @param tableNameWithType the table name with type | ||
| * @param taskType the task type. | ||
| * @return a new empty {@link TaskGeneratorMostRecentRunInfo} | ||
| */ | ||
| public static TaskGeneratorMostRecentRunInfo newInstance(String tableNameWithType, String taskType) { | ||
| return new TaskGeneratorMostRecentRunInfo(tableNameWithType, taskType); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.common.minion; | ||
|
|
||
| import java.util.function.Consumer; | ||
|
|
||
|
|
||
| public interface TaskManagerStatusCache<T extends BaseTaskGeneratorInfo> { | ||
|
|
||
| T fetchTaskGeneratorInfo(String tableNameWithType, String taskType); | ||
|
|
||
| void saveTaskGeneratorInfo(String tableNameWithType, String taskType, | ||
| Consumer<T> taskGeneratorMostRecentRunInfoUpdater); | ||
|
|
||
| void deleteTaskGeneratorInfo(String tableNameWithType, String taskType); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to add
implements Serializable