-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Description
Is your feature request related to a problem? Please describe
Create ingestion management APIs for pull-based ingestion for the following.
- Pause and resume ingestion.
- Update offset.
- Get ingestion state.
- Update error strategy (update_settings API will be used)
As a separate effort, we also plan to support custom consumer configurations and add an API to update consumer config.
Describe the solution you'd like
Following APIs will be created.
Pause Ingestion
POST /<index>/ingestion/_pause
Path parameters:
index: the index for which ingestion needs to be paused.
Description:
This API can be used to pause ingestion for a given index. Ingestion will be paused on all shards for the provided index. This API will update the cluster state to reflect ingestion pause and also run pause operation on each shard. The response will include request and shard level acknowledgements.
Response:
{
"acknowledged": true/false,
"shards_acknowledged": true/false,
"error": "error message if any",
"failures": {
"indexName": [
{
"shard": 0,
"error": "error message "
}
]
}
}
Resume Ingestion
POST /<index>/ingestion/_resume
Path parameters:
index: the index for which ingestion needs to be resumed.
Description:
This API can be used to resume ingestion for a given index. All shards for the provided index will be resumed. Optionally, a list of reset settings can be provided for a subset of shards. If reset settings are provided, the consumers will first be reset following which ingestion will be resumed. Ingestion will only be resumed if all consumers are successfully reset.
Resume operation will first update the cluster state following which each shard will be resumed. The response will indicate request and shard level acknowledgements.
Request:
{
"reset_settings": [
{
"shard": 0,
"mode": "offset/timestamp",
"value": "1"
}
]
}
Response:
{
"acknowledged": true/false,
"shards_acknowledged": true/false,
"error": "error message if any",
"failures": {
"indexName": [
{
"shard": 0,
"error": "error message "
}
]
}
}
Get Ingestion State
GET /<index>/ingestion/_state
Path parameters:
index: the index for which ingestion state needs to be returned.
shards: optional
Description:
This API returns the current state of ingestion for the provided index. Optionally, a list of shards can be provided.
This API supports pagination.
Response:
{
"_shards": {
"total": 1,
"successful": 1,
"failed": 0,
"failures": [
{
"shard": 0,
"index": "my-index",
"status": "INTERNAL_SERVER_ERROR",
"reason": {
"type": "timeout_exception",
"reason": "error message"
}
}
]
},
"next_page_token" : "page token if not on last page"
"ingestion_state": {
"indexName": [
{
"shard": 0,
"poller_state": "POLLING",
"error_policy": "DROP",
"poller_paused": false
}
]
}
}
Update Error Strategy
update_settings API will be used to update the error strategy. IndexSettingsHandler for “index.ingestion_source.error_strategy” will be registered in the IngestionEngine, which will update the error strategy in the poller and writer threads.
Related component
Indexing
Describe alternatives you've considered
No response
Additional context
No response