-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC] Streaming Index API #3000
Comments
|
I'd like to restate the problem a little bit to help me understand better:
Is that right? I think a clearer description of the use cases being targeted here would help us define the semantics of the API. |
I think it's crazy that ES (and now OS) does not offer a streaming indexing API. The bulk indexing API is horrible -- clients must figure out 1) how many docs to include in every bulk request (incurring possibly high latency if docs/sec is not so high), 2) how many concurrent bulk requests to send to maximize indexing throughput, 3) handle resulting heap/OOME issues, and 4) then handle the horrific A streaming indexing API would empower the server to pull docs off the wire into the "right" sized chunks/concurrency based on available indexing and heap resources, would allow simple back-pressure if the requested indexing rate is too high for the cluster resources, and finally would enable a very simple durability model (when client is done sending the streaming docs, when they see the ACK at the end, the changes are durable). We should deprecate the poorly designed bulk indexing API and switch to a simpler/less risky streaming indexing API! |
Here is a proposal for the above mentioned RFC, Current StateThe OpenSearch A bulk request contains operations intended for multiple indices and shards. Once a bulk request is received by the coordinating node, it
Streaming ModeThe bulk API assumes data is bounded and available in batches. In reality, a lot of data is unbounded because it arrives gradually over time. With the bulk API, the user must artificially divide the data into chunks of fixed size. However, there is no clear way to know what is the right size of the chunk without experimenting with it. With streaming mode, we can send a stream of data records using the bulk API in in a single HTTP connection. To enable streaming mode, we will leverage upon the Chunked Transfer Encoding to establish a persistent connection and OpenSearch will accept the records in order as long as the connection is open. For each record, OpenSearch will asynchronously send a response to the client in the same order. In the streaming mode, the API can be called with an additional header "Transfer-Encoding: chunked” on the initial request. For example to establish a connection,
Once the stream has been established, we can start sending multiple requests on the same stream. To existing bulk API uses the newline delimited JSON (NDJSON) structure:
In the streaming mode, we would want to combine the action, metadata and the optional source into a single structure to stream it atomically. For example,
However, it would add a breaking change to the bulk API and the alternative would be to introduce a new API. Refresh PolicyIn OpenSearch, a refresh operation makes the changes to shards available for search. OpenSearch provides the dynamic setting With the bulk API, we can pass an URL parameter In the streaming mode, we may want to reconsider whether to provide this control to the client. Durability LevelsCurrently, OpenSearch uses a transaction log to store index and delete operations until they are committed to the Lucene index. OpenSearch, through the
With streaming mode, we may want to keep the same durability mechanism and this can be taken care of as part of the pluggable translog feature proposal. HTTP VersionsThe Chunk Transfer Encoding is only available in
Persistent ConnectionIf the client doesn’t close the connection when all the data is streamed, resources on the server is wasted and we may need to consider closing the connection after a certain client idle time out. CompressionCurrently, the requests to OpenSearch and responses to the client can be gzip-compressed for optimization. With the streaming mode enabled, the individual requests can not be compressed. Next stepsBased on the feedback from this RFC, we will work on the design (link to be added) for the above solution. |
@adnapibar thanks a lot for formalizing the proposal, I was thinking about possible streaming mode implementation and I think the Chunked Transfer Encoding may not be what we should be looking at. To be clear - chunking transfer does work, even the back pressure should be feasible to support, but what does not for bulk ingestion is error propagation, let me elaborate. Let us assume that client establishes the stream and starts sending bulk requests. At some point, the server may need to shed the load and stop ingestion, including the bulk streams. How to communicate that back to the client?
HTTP/2 or/and WebSockets could be the superior options in this case to explore. The client streams bulk requests but the servers communicates back the status of the stream processing.
I am curious how asynchronous response per each record could be implemented specifically for chunked transfer encoding: AFAIK most of the clients and servers would not deal with response before finishing with the request, even if both request and response are chunked. Thank you. |
@reta Thanks for the feedback. I was suggesting that each chunk to be an individual operation that can be handled atomically by the server and a corresponding response to be sent as chunked encoding. It doesn't have the same semantics as the bulk request which handles batches of operations. While there are lots of implementations details that I'm unaware at this point, I would appreciate to hear drawbacks and alternative mechanisms. |
I agree with @reta about http2 and/or websockets. When I had originally read this issue that is what I immediately thought of. With 2-way communication it would be easier for OpenSearch to notify the client of its state, ie. low heap, full queue, etc. |
+1, the persistence of a connection is not really the problem (can be achieved with keep-alive today), it's the need to have to reason about the size of bulk requests to avoid sending too many requests Also, while it's not possible to follow the progress of each separate request in the bulk API (that would be streaming), I think we should also consider possible ways to improve upon that, e.g. #3625 |
The Kafka protocol is perhaps a good example to look at. It is a fully pipeline-able 2-way streaming API which is what I was thinking of here. The key points are that clients can (and should) use non-blocking I/O to implement request pipelining and that a single client should rarely if ever need to implement connection pooling to send requests across multiple connections. However, Kafka defines a completely hand-rolled binary format on a plain TCP connection which puts a lot of implementation burden on clients, which is probably not the right thing to do given that there are now streaming options available like HTTP/2 and WebSockets. |
We have to provide an API over HTTP and not only with clients for high level programming languages. I agree that HTTP/2 is a superior option but we don't have support for it yet. @andrross Kafka also provides a streaming REST API over HTTP using the same mechanism that I proposed earlier. |
Opened a separate issue to add support for HTTP/2 #3651 |
I think the streaming index API should be a new API. Like segment replication it should start as experimental behind a feature flag so we can benchmark default parameters and API semantics before promoting it as a first class bulk ingest mechanism. As you touch on in the
This defaults to |
I agree that it should be a new API as it allows you to pack more optimizations within this API that are natively closer to the streaming paradigm and would be difficult to implement in the existing synchronous bulk API, especially with all its durability options. I would also think about how we could directly stream to the data node instead of coordinator node splitting the requests in between. This would probably require us to vend clients with some intelligence about routing? I have been thinking about whether we should explore a more performant x content parser for high throughput writes than JSON and streaming API fits pretty well there. Instead of just using existing bulk formats, we could implement a more performant wire protocol along with this API.
You need to be more careful about this change as it introduces much more duplicate processing with every document parsed. |
I concur the thought of having a separate API to revisit our freshness and durability semantics and pack optimizations as needed. I guess the network infrastructure/firewall would potentially limit how long the connection can stay open, this should also factor in inevitable cases where the connections have to be forcibly closed like server maintenance Do we also plan on supporting a client library for ensuring persistent consistent(keep-alive), connection close on end of stream, backup buffering mechanism if the server isn't able to process as fast and close connection if the buffer hits a certain limit, reconnection on connection drops.
@itiyama I think we could have the coordinator split the streams for parallel processing and fan it out to respective shards as needed or even consider having a single stream always write to a single shard if there are overheads with splitting them |
@reta do we want to create a meta issue and start documenting the client breaking changes to help drive a bwc / 3.0 release discussion for supporting this feature? |
@nknize very likely so, I will work on POCs over the next week to outline the implementation choices, than we could bundle that into meta issue weighting the breaking changes, thanks! |
@reta @nknize, I have created a github project for better tracking and visibility. https://github.com/orgs/opensearch-project/projects/116/views/1. Feel free to add all related issues to this board and we can track it from there |
I hope we do not expose cluster details like "heap is low", "too many concurrent requests", etc., through this new API. Rather, the cluster should pull from the stream at the rate it is able to given the load and free resources in the cluster. Probably the first POC should build on If the client is sending more docs per second than the cluster can handle, just let the standard TCP level networking back-pressure stall the incoming bytes. The client in turn will see that the socket is blocking on trying to write too many bytes/second. In this new streaming indexing API, clients should never see rejections. I don't think we should enable fancy durability options to begin. Start simple: nothing is durable until the client closed the connection and gets the final ACK / TCP close success back from the server. Likewise, let's not tie refresh to this new API either. If security use-cases really need this, then they should call Similarly, let's not introduce transactional semantics for the first go. I.e. the ongoing periodic refreshes (1s by default) will expose any recently indexed documents via this new streaming API for searching. |
Thanks a lot @mikemccand , your comments are very much aligned with what we have discussed as the way forward, I intend to capture these details in the implementation proposal (next week or so). Thank you again. |
Why does this feature entail a breaking change? It seems capable of existing in the 2.x line. |
There will need to be HTTP2 streaming OpenSearch APIs added the clients in order to leverage the streaming index API. This will require an upgrade to both clients and servers. @reta I think that can be done in a backward compatible way but it would require supporting both HTTP1 and HTTP2 and I'm not sure there's a good understanding yet of that blast radius and what tech debt will continue to accrue. We already have so much as it is. That along with server and client upgrades I wonder if we just box this to 3.0 only and release when we have a date schedule for 3.0? |
I also want to point out that we intend for this to be the transport replacement for both |
@nknize "yes" for server side but "no" for clients - the existing 2.x clients (rest / RHLC) are build on top of Apache Http Client 4.x and it does not support HTTP/2. |
Is your feature request related to a problem? Please describe.
Current
_bulk
indexing API places a high configuration burden on users today to avoidRejectedExecutionException
due toTOO_MANY_REQUESTS
. This forces the user to "experiment" with bulk block sizes, multi-threading, refresh intervals, etc.Describe the solution you'd like
The _bulk configuration burden and workflow should be relocated from the user and handled by the server. The user experience should switch to an anxiety free API that enables users to send a "stream" of index requests that is load balanced by the server in a Streaming Index mechansim.
This Streaming Index API mechanism should also handle the "durability" responsibility based on a user defined Durability Policy to determine the following:
Describe alternatives you've considered
Continue w/ durability as it is today w/ a document replication model.
The text was updated successfully, but these errors were encountered: