Skip to content

[Feature Request] Ingestion management APIs for pull-based ingestion #17442

@varunbharadwaj

Description

@varunbharadwaj

Is your feature request related to a problem? Please describe

Create ingestion management APIs for pull-based ingestion for the following.

  1. Pause and resume ingestion.
  2. Update offset.
  3. Get ingestion state.
  4. Update error strategy (update_settings API will be used)

As a separate effort, we also plan to support custom consumer configurations and add an API to update consumer config.

Describe the solution you'd like

Following APIs will be created.

Pause Ingestion

POST /<index>/ingestion/_pause

Path parameters:
index: the index for which ingestion needs to be paused.

Description:
This API can be used to pause ingestion for a given index. Ingestion will be paused on all shards for the provided index. This API will update the cluster state to reflect ingestion pause and also run pause operation on each shard. The response will include request and shard level acknowledgements.

Response:

{
    "acknowledged": true/false,
    "shards_acknowledged": true/false,
    "error": "error message if any",
    "failures": {
        "indexName": [
            {
                "shard": 0,
                "error": "error message "
            }
        ]
    }
}

Resume Ingestion

POST /<index>/ingestion/_resume

Path parameters:
index: the index for which ingestion needs to be resumed.

Description:
This API can be used to resume ingestion for a given index. All shards for the provided index will be resumed. Optionally, a list of reset settings can be provided for a subset of shards. If reset settings are provided, the consumers will first be reset following which ingestion will be resumed. Ingestion will only be resumed if all consumers are successfully reset.

Resume operation will first update the cluster state following which each shard will be resumed. The response will indicate request and shard level acknowledgements.

Request:

{
  "reset_settings": [
    {
       "shard": 0,
       "mode": "offset/timestamp",
       "value": "1"
    }
  ]
}

Response:

{
    "acknowledged": true/false,
    "shards_acknowledged": true/false,
    "error": "error message if any",
    "failures": {
        "indexName": [
            {
                "shard": 0,
                "error": "error message "
            }
        ]
    }
}

Get Ingestion State

GET /<index>/ingestion/_state

Path parameters:
index: the index for which ingestion state needs to be returned.
shards: optional

Description:
This API returns the current state of ingestion for the provided index. Optionally, a list of shards can be provided.
This API supports pagination.

Response:

{
    "_shards": {
        "total": 1,
        "successful": 1,
        "failed": 0,
        "failures": [
            {
                "shard": 0,
                "index": "my-index",
                "status": "INTERNAL_SERVER_ERROR",
                "reason": {
                    "type": "timeout_exception",
                    "reason": "error message"
                }
            }
        ]
    },
    "next_page_token" : "page token if not on last page"
    "ingestion_state": {
        "indexName": [
            {
                "shard": 0,
                "poller_state": "POLLING",
                "error_policy": "DROP",
                "poller_paused": false
            }
        ]
    }
}

Update Error Strategy

update_settings API will be used to update the error strategy. IndexSettingsHandler for “index.ingestion_source.error_strategy” will be registered in the IngestionEngine, which will update the error strategy in the poller and writer threads.

Related component

Indexing

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    IndexingIndexing, Bulk Indexing and anything related to indexingenhancementEnhancement or improvement to existing feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions