-
Notifications
You must be signed in to change notification settings - Fork 25.3k
[ML] add new snapshot upgrader API for upgrading older snapshots #64665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] add new snapshot upgrader API for upgrading older snapshots #64665
Conversation
This new API provides a way for users to upgrade their own anomaly job model snapshots. To upgrade a snapshot the following is done: - Open a native process given the job id and the desired snapshot id - load the snapshot to the process - write the snapshot again from the native task (now updated via the native process) closes elastic#64154
Pinging @elastic/ml-core (:ml) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was one thing in this change that was concerning to me.
The model size stats seem to simply disappear when upgrading the snapshot. This is probably because the CResourceMonitor::createMemoryUsageReport
simply generates the report based on the current job usage (which is effectively nothing).
This strikes me as strange as revert
actually does change the model size stats for the job...
Here were my test results:
BEFORE
"model_size_stats" : {
"job_id" : "largish-kibana-sample-data",
"result_type" : "model_size_stats",
"model_bytes" : 4972992,
"peak_model_bytes" : 2495692,
"model_bytes_exceeded" : 0,
"model_bytes_memory_limit" : 524288000,
"total_by_field_count" : 130,
"total_over_field_count" : 0,
"total_partition_field_count" : 129,
"bucket_allocation_failures_count" : 0,
"memory_status" : "ok",
"categorized_doc_count" : 0,
"total_category_count" : 0,
"frequent_category_count" : 0,
"rare_category_count" : 0,
"dead_category_count" : 0,
"failed_category_count" : 0,
"categorization_status" : "ok",
"log_time" : 1604588763336,
"timestamp" : 1607272200000
},
AFTER
"model_size_stats": {
"job_id": "largish-kibana-sample-data",
"result_type": "model_size_stats",
"model_bytes": 11954,
"peak_model_bytes": 0,
"model_bytes_exceeded": 0,
"model_bytes_memory_limit": 524288000,
"total_by_field_count": 130,
"total_over_field_count": 0,
"total_partition_field_count": 129,
"bucket_allocation_failures_count": 0,
"memory_status": "ok",
"categorized_doc_count": 0,
"total_category_count": 0,
"frequent_category_count": 0,
"rare_category_count": 0,
"dead_category_count": 0,
"failed_category_count": 0,
"categorization_status": "ok",
"log_time": 1604591566986,
"timestamp": 1607272200000
},
There might need to be changes on the C++ side so that model size states are not regenerated on snapshot save (especially when persist in foreground is used).
@droberts195 let me know what you think.
String snapshotId = "1541587919"; | ||
|
||
createModelSnapshot(jobId, snapshotId, Version.V_7_0_0); | ||
//TODO add a true state from the past somehow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not 100% how to do this. Right now this test effectively just checks that the parameters are parsed and sent as the resulting error indicates that we at least tried to load the model snapshot.
The solution might be to get an actual snapshot, and then manually updating the doc so that the min_version is old.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The solution might be to get an actual snapshot, and then manually updating the doc so that the min_version is old.
Yes, in terms of testing the infrastructure that would be a good way. Run a simple job like farequote, update the model snapshot document after the job is closed to have a min_version
from the previous major, then upgrade it. Not sure this needs to be done in the HRLC tests though - for such a complex test the native multi node tests seems like the single place to do it. I am happy to leave this test as-is, just testing the parameter passing.
In terms of testing actual upgrade, it could be done in the BWC tests. We could have a BWC test (Java, not YAML) that does nothing when the old cluster is on the same major, but when the old cluster is on a different major it opens/runs/closes a job in the old cluster, then upgrades its model snapshot in the fully upgraded cluster (and does nothing in the mixed cluster).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I have a BWC test class covering this case.
super(NAME, Response::new); | ||
} | ||
|
||
public static class Request extends MasterNodeRequest<Request> implements ToXContentObject { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a master node action as we always want the latest cluster state information.
|
||
public enum SnapshotUpgradeState implements Writeable { | ||
|
||
READING_NEW_STATE, STOPPED, FAILED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't opt for a "writing_old_state" or an "opened" state as neither really conveyed any information. If the state is null, we know that either it is not assigned to a node or it is assigned and still loading the old snapshot.
Once we are in the reading_new_state
, then that indicates that we have reached the point of no return and any failure from that state indicates a corrupted job model snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reading_new_state
is very much from the perspective of the Java code rather than the end user. As an end user who doesn't even know that the code is split between Java and C++ I would have thought writing_new_state
makes more sense. Or saving_new_state
would be a compromise that makes sense to both end users and Java developers.
I would also introduce a reading_old_state
or loading_old_state
enum value that can be used in stats and API responses instead of null
. We went through that cycle with job states. Initially there was no opening
state, because a null
task state basically meant that. But then we found it was nicer to have a specific enum value for it and translate null
to that enum value in some places. Even if it's not used anywhere initially it will avoid BWC code to add it to the enum from the outset.
@@ -678,8 +684,8 @@ protected Clock getClock() { | |||
} | |||
} else { | |||
mlController = new DummyController(); | |||
autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) -> | |||
new BlackHoleAutodetectProcess(job.getId(), onProcessCrash); | |||
autodetectProcessFactory = (pipelineId, job, autodetectParams, executorService, onProcessCrash) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipelineId
is for renaming the resulting file pipeline. See below comments for further explanation
|
||
if (state.nodes().getMaxNodeVersion().after(state.nodes().getMinNodeVersion())) { | ||
listener.onFailure(ExceptionsHelper.conflictStatusException( | ||
"Cannot upgrade job [{}] snapshot [{}] as not all nodes are on version {}. All nodes must be the same version", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just eliminates the small edge cases of requiring job node assignment to take into account node version. These processes are short lived, and restricting the cluster to not be a mixed cluster is a sane limitation. Especially since this API is meant to be used right before upgrading to the next major version.
public AutodetectProcess createAutodetectProcess(String pipelineId, | ||
Job job, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the requirements was that this upgrade be possible WHILE the referenced job is running. Consequently, the snapshot upgrade task and the job task COULD be assigned to the same node. If the pipeline ID was not given directly, this would cause a file name conflict.
Admittedly, there is already this "unique pipeline flag" but that is a long
value. I thought it would be nice to include the snapshot ID directly in the pipeline name. It makes the resulting logs very easy to investigate snapshot upgrader issues by looking for <job_id>-<snapshot_id>
* <p> | ||
* This is a single purpose result processor and only handles snapshot writes | ||
*/ | ||
public class JobSnapshotUpgraderResultProcessor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created a new processor here as I didn't want to chance ANY other result being written back. This just protects us from inadvertently updating the job results/state when we didn't mean to.
I possibly could have had an AbstractResultProcessor
class, but shared code was so little, it didn't really seem worth it.
if (persistentTask == null) { | ||
isCompleted = true; | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In all my testing, the task is only null
when it has been removed from cluster state. Since this predicate runs AFTER we have confirmed the task has been added to state (the start task API
), it is good to assume that null
is removal and thus is completion.
if (SnapshotUpgradeState.READING_NEW_STATE.equals(jobState)) { | ||
deleteSnapshotAndFailTask(task, params.getJobId(), params.getSnapshotId()); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are assigned to a new node while reading_new_state
, the snapshot could be corrupted since the files are being overwritten one by one.
Consequently, we audit, log and then delete the snapshot as it is unusable anyways.
It MIGHT be better to add a flag to the snapshot that says "bad snapshot". But, the way to recover here would be to delete the job model state and then restore from an elasticsearch snapshot...Up for debate.
List<ModelSnapshot> snapshots = getModelSnapshots(job.getId(), snapshot.getSnapshotId()).snapshots(); | ||
assertThat(snapshots, hasSize(1)); | ||
assertThat(snapshot.getLatestRecordTimeStamp(), equalTo(snapshots.get(0).getLatestRecordTimeStamp())); | ||
|
||
// Does the snapshot still work? | ||
assertThat(hlrc.getJobStats(new GetJobStatsRequest(JOB_ID), RequestOptions.DEFAULT) | ||
.jobStats() | ||
.get(0) | ||
.getDataCounts().getLatestRecordTimeStamp(), | ||
greaterThan(snapshot.getLatestRecordTimeStamp())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After backport I want to add a mixed cluster test (to make sure the mixed node error throws) and I want to verify that the min_version
is updated on the new snapshot.
Right now, since 8.x does not support upgrades from 6.x, that is not possible here. But in 7.x, it will be good to test that min_version
gets adjusted.
After this is merged and backported, another API should be added to check the stats of snapshot upgrades and changes to the deprecation API to include when a snapshot is too old to run in the next major. |
I agree there need to be changes on the C++ side. For For @edsavage please could you investigate those two things. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for writing this extremely complicated yet also tedious functionality.
There's a lot of code so I haven't reviewed it all in detail, but have left an initial set of comments. The biggest one is that we need to think about how to avoid excessive complexity in the Kibana migration assistant that will have to use this code eventually.
String snapshotId = "1541587919"; | ||
|
||
createModelSnapshot(jobId, snapshotId, Version.V_7_0_0); | ||
//TODO add a true state from the past somehow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The solution might be to get an actual snapshot, and then manually updating the doc so that the min_version is old.
Yes, in terms of testing the infrastructure that would be a good way. Run a simple job like farequote, update the model snapshot document after the job is closed to have a min_version
from the previous major, then upgrade it. Not sure this needs to be done in the HRLC tests though - for such a complex test the native multi node tests seems like the single place to do it. I am happy to leave this test as-is, just testing the parameter passing.
In terms of testing actual upgrade, it could be done in the BWC tests. We could have a BWC test (Java, not YAML) that does nothing when the old cluster is on the same major, but when the old cluster is on a different major it opens/runs/closes a job in the old cluster, then upgrades its model snapshot in the fully upgraded cluster (and does nothing in the mixed cluster).
UpgradeJobModelSnapshotResponse response = client.machineLearning().upgradeJobSnapshot(request, RequestOptions.DEFAULT); | ||
// end::upgrade-job-model-snapshot-execute | ||
} catch (ElasticsearchException ex) { | ||
// TODO have a true snapshot in the past to upgrade? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, this will be complex and expensive, and I am not sure that is justified for the docs tests. We can do it once as part of the native multi node tests, but burning that CPU many times in a full CI run seems unjustified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I have a BWC test class covering this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case I think you should remove the TODO from here and instead have a comment to say that this is just checking syntax because actual upgrade is covered elsewhere.
-------------------------------------------------- | ||
<1> The job that owns the snapshot | ||
<2> The snapshot id to upgrade | ||
<3> The time out of the request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an end user I would be interested in what this means if wait_for_completion=false
, and what the default is.
|
||
public enum SnapshotUpgradeState implements Writeable { | ||
|
||
READING_NEW_STATE, STOPPED, FAILED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reading_new_state
is very much from the perspective of the Java code rather than the end user. As an end user who doesn't even know that the code is split between Java and C++ I would have thought writing_new_state
makes more sense. Or saving_new_state
would be a compromise that makes sense to both end users and Java developers.
I would also introduce a reading_old_state
or loading_old_state
enum value that can be used in stats and API responses instead of null
. We went through that cycle with job states. Initially there was no opening
state, because a null
task state basically meant that. But then we found it was nicer to have a specific enum value for it and translate null
to that enum value in some places. Even if it's not used anywhere initially it will avoid BWC code to add it to the enum from the outset.
try { | ||
return PARSER.parse(parser, null); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider UncheckedIOException
?
if (response.result.getMinVersion().major >= UPGRADE_FROM_MAJOR) { | ||
listener.onFailure(ExceptionsHelper.conflictStatusException( | ||
"Cannot upgrade job [{}] snapshot [{}] as it is already compatible with current major version {}", | ||
request.getJobId(), | ||
request.getSnapshotId(), | ||
UPGRADE_FROM_MAJOR)); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do the check on the exact version rather than just the major. Although it shouldn't be necessary, upgrading the format from e.g. 7.0 format to 7.11 format might be a useful piece of functionality to have in the future to work around some other bug.
listener.onFailure(ExceptionsHelper.conflictStatusException( | ||
"Cannot upgrade snapshot [{}] for job [{}] as it is the current primary job snapshot", | ||
request.getSnapshotId(), | ||
request.getJobId() | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means extra complication for the Kibana upgrade assistant though. For every model snapshot that exists that is too old it will now have to recommend one of two possible courses of action, depending on whether the snapshot is the active one or not. Opening and closing a job normally without sending it any data doesn't rewrite the snapshot, so the user would also have to feed some data to actually change the active model snapshot of the job. So I think this check should be altered to only ban upgrading the active snapshot if the job is open.
return fieldIndexes; | ||
} | ||
|
||
void writeHeader() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not surprised you had to write a header. You could probably get away with writing one with just the control field (field name .
). But it's not particularly important, so I'm happy to leave what's here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I noticed a few more minor things but am happy to merge this once they're resolved.
UpgradeJobModelSnapshotResponse response = client.machineLearning().upgradeJobSnapshot(request, RequestOptions.DEFAULT); | ||
// end::upgrade-job-model-snapshot-execute | ||
} catch (ElasticsearchException ex) { | ||
// TODO have a true snapshot in the past to upgrade? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case I think you should remove the TODO from here and instead have a comment to say that this is just checking syntax because actual upgrade is covered elsewhere.
@@ -244,4 +244,15 @@ public void writeStartBackgroundPersistMessage() throws IOException { | |||
fillCommandBuffer(); | |||
lengthEncodedWriter.flush(); | |||
} | |||
|
|||
public void writeStartBackgroundPersistMessage(long snapshotTimestamp, String snapshotId, String description) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a Javadoc comment to say whether snapshotTimestamp
is in epoch millis or epoch seconds. Also, it might be worth adding Seconds
or Millis
to the variable name to make ultra clear which it is for future maintainers.
@@ -36,6 +36,15 @@ | |||
*/ | |||
void persistState() throws IOException; | |||
|
|||
/** | |||
* Ask the process to persist state, even if it is unchanged. | |||
* @param snapshotTimestamp The snapshot timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add whether this is epoch seconds or epoch millis.
// C++ is expecting the timestamp to be in seconds, not Milliseconds | ||
params.modelSnapshot().getTimestamp().getTime()/1000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Java rarely uses epoch seconds, it's probably better to move the /1000
closer to the point of passing the information to the C++ process, e.g. in AutodetectControlMsgWriter
.
bulkResultsPersister.executeRequest(); | ||
} | ||
} catch (Exception e) { | ||
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e); | |
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting model snapshot [{}] upgrade results", jobId, snapshotId), e); |
// that it would have been better to close jobs before shutting down, | ||
// but we now fully expect jobs to move between nodes without doing | ||
// all their graceful close activities. | ||
LOGGER.warn("[{}] some results not processed due to the process being killed", jobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.warn("[{}] some results not processed due to the process being killed", jobId); | |
LOGGER.warn("[{}] some model snapshot [{}] upgrade results not processed due to the process being killed", jobId, snapshotId); |
LOGGER.warn("[{}] some results not processed due to the process being killed", jobId); | ||
} else if (process.isProcessAliveAfterWaiting() == false) { | ||
// Don't log the stack trace to not shadow the root cause. | ||
LOGGER.warn("[{}] some results not processed due to the termination of autodetect", jobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.warn("[{}] some results not processed due to the termination of autodetect", jobId); | |
LOGGER.warn("[{}] some model snapshot [{}] upgrade results not processed due to the termination of autodetect", jobId, snapshotId); |
} else { | ||
// We should only get here if the iterator throws in which | ||
// case parsing the autodetect output has failed. | ||
LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", jobId), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", jobId), e); | |
LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output during model snapshot [{}] upgrade", jobId, snapshotId), e); |
if (isAlive() == false) { | ||
throw e; | ||
} | ||
LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); | |
LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result during model snapshot [{}] upgrade", jobId, snapshotId), e); |
|
||
private void logUnexpectedResult(String resultType) { | ||
LOGGER.info("[{}] [{}] unexpected result read [{}]", jobId, snapshotId, resultType); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding assert resultType == null
or something else that will detect if this happens during our integration tests.
…stic#64665) This new API provides a way for users to upgrade their own anomaly job model snapshots. To upgrade a snapshot the following is done: - Open a native process given the job id and the desired snapshot id - load the snapshot to the process - write the snapshot again from the native task (now updated via the native process) relates elastic#64154
When a persist control message with arguments is received by the anomaly detector it doesn't go through the standard chain of persistence calls, as it unconditionally rewrites the state (even if no data has been seen) and includes only the anomaly detector state rather than the categorizer state too. Because of this the memory usage was not being recalculated prior to persisting the state as would normally happen. This PR rectifies that omission. Fixes one of the problems detailed in elastic/elasticsearch#64665 (review)
#64665) (#65010) * [ML] add new snapshot upgrader API for upgrading older snapshots (#64665) This new API provides a way for users to upgrade their own anomaly job model snapshots. To upgrade a snapshot the following is done: - Open a native process given the job id and the desired snapshot id - load the snapshot to the process - write the snapshot again from the native task (now updated via the native process) relates #64154
When a persist control message with arguments is received by the anomaly detector it doesn't go through the standard chain of persistence calls, as it unconditionally rewrites the state (even if no data has been seen) and includes only the anomaly detector state rather than the categorizer state too. Because of this the memory usage was not being recalculated prior to persisting the state as would normally happen. This PR rectifies that omission. Fixes one of the problems detailed in elastic/elasticsearch#64665 (review)
When a persist control message with arguments is received by the anomaly detector it doesn't go through the standard chain of persistence calls, as it unconditionally rewrites the state (even if no data has been seen) and includes only the anomaly detector state rather than the categorizer state too. Because of this the memory usage was not being recalculated prior to persisting the state as would normally happen. This PR rectifies that omission. Fixes one of the problems detailed in elastic/elasticsearch#64665 (review) Backport of elastic#1585
When a persist control message with arguments is received by the anomaly detector it doesn't go through the standard chain of persistence calls, as it unconditionally rewrites the state (even if no data has been seen) and includes only the anomaly detector state rather than the categorizer state too. Because of this the memory usage was not being recalculated prior to persisting the state as would normally happen. This PR rectifies that omission. Fixes one of the problems detailed in elastic/elasticsearch#64665 (review) Backport of #1585
This new API provides a way for users to upgrade their own anomaly job
model snapshots.
To upgrade a snapshot the following is done:
native process)
relates #64154